You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/08/18 09:45:51 UTC

samza git commit: SAMZA-738: Samza timer based metrics does not have enough precision

Repository: samza
Updated Branches:
  refs/heads/master 6c4ecb532 -> 3a4886a65


 SAMZA-738: Samza timer based metrics does not have enough precision


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a4886a6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a4886a6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a4886a6

Branch: refs/heads/master
Commit: 3a4886a6531530de67b7cab6ed004653e9e6aa38
Parents: 6c4ecb5
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Tue Aug 18 00:45:34 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Aug 18 00:45:34 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    | 44 ++++++++++----------
 .../samza/container/SamzaContainerMetrics.scala |  8 ++--
 .../org/apache/samza/util/TimerUtils.scala      |  9 ++--
 .../apache/samza/container/TestRunLoop.scala    | 28 ++++++-------
 .../system/kafka/KafkaSystemProducer.scala      |  4 +-
 .../kafka/KafkaSystemProducerMetrics.scala      | 10 +----
 6 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index c292ae4..24da35f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{ SystemStreamPartition, SystemConsumers }
+import org.apache.samza.system.{SystemConsumers, SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.{Logging, TimerUtils}
 
 /**
  * Each {@link SamzaContainer} uses a single-threaded execution model: activities for
@@ -39,12 +38,13 @@ class RunLoop(
   val metrics: SamzaContainerMetrics,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
-  val clock: () => Long = { System.currentTimeMillis },
+  val clock: () => Long = { System.nanoTime },
   val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
 
-  private var lastWindowMs = clock()
-  private var lastCommitMs = clock()
-  private var activeMs = 0L
+  private val metricsMsOffset = 1000000L
+  private var lastWindowNs = clock()
+  private var lastCommitNs = clock()
+  private var activeNs = 0L
   private var taskShutdownRequests: Set[TaskName] = Set()
   private var taskCommitRequests: Set[TaskName] = Set()
   @volatile private var shutdownNow = false
@@ -67,13 +67,13 @@ class RunLoop(
     addShutdownHook(Thread.currentThread())
 
     while (!shutdownNow) {
-      val loopStartTime = clock();
+      val loopStartTime = clock()
       process
       window
       commit
-      val totalMs = clock() - loopStartTime
-      metrics.utilization.set(activeMs.toFloat/totalMs)
-      activeMs = 0L
+      val totalNs = clock() - loopStartTime
+      metrics.utilization.set(activeNs.toFloat/totalNs)
+      activeNs = 0L
     }
   }
 
@@ -100,8 +100,8 @@ class RunLoop(
     trace("Attempting to choose a message to process.")
     metrics.processes.inc
 
-    activeMs += updateTimerAndGetDuration(metrics.processMs) {
-      val envelope = updateTimer(metrics.chooseMs) {
+    activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
+      val envelope = updateTimer(metrics.chooseNs) {
         consumerMultiplexer.choose
       }
 
@@ -120,17 +120,17 @@ class RunLoop(
         trace("No incoming message envelope was available.")
         metrics.nullEnvelopes.inc
       }
-    }
+    })
   }
 
   /**
    * Invokes WindowableTask.window on all tasks if it's time to do so.
    */
   private def window {
-    activeMs += updateTimerAndGetDuration(metrics.windowMs) {
-      if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+    activeNs += updateTimerAndGetDuration(metrics.windowNs) ((currentTimeNs: Long) => {
+      if (windowMs >= 0 && lastWindowNs + windowMs * metricsMsOffset < currentTimeNs) {
         trace("Windowing stream tasks.")
-        lastWindowMs = clock()
+        lastWindowNs = currentTimeNs
         metrics.windows.inc
 
         taskInstances.foreach {
@@ -140,17 +140,17 @@ class RunLoop(
             checkCoordinator(coordinator)
         }
       }
-    }
+    })
   }
 
   /**
    * Commits task state as a a checkpoint, if necessary.
    */
   private def commit {
-    activeMs += updateTimerAndGetDuration(metrics.commitMs) {
-      if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
+    activeNs += updateTimerAndGetDuration(metrics.commitNs) ((currentTimeNs: Long) => {
+      if (commitMs >= 0 && lastCommitNs + commitMs * metricsMsOffset < currentTimeNs) {
         trace("Committing task instances because the commit interval has elapsed.")
-        lastCommitMs = clock()
+        lastCommitNs = currentTimeNs
         metrics.commits.inc
         taskInstances.values.foreach(_.commit)
       } else if (!taskCommitRequests.isEmpty) {
@@ -162,7 +162,7 @@ class RunLoop(
       }
 
       taskCommitRequests = Set()
-    }
+    })
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index aa7a9bc..127f3a1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -33,9 +33,9 @@ class SamzaContainerMetrics(
   val sends = newCounter("send-calls")
   val envelopes = newCounter("process-envelopes")
   val nullEnvelopes = newCounter("process-null-envelopes")
-  val chooseMs = newTimer("choose-ms")
-  val windowMs = newTimer("window-ms")
-  val processMs = newTimer("process-ms")
-  val commitMs = newTimer("commit-ms")
+  val chooseNs = newTimer("choose-ns")
+  val windowNs = newTimer("window-ns")
+  val processNs = newTimer("process-ns")
+  val commitNs = newTimer("commit-ns")
   val utilization = newGauge("event-loop-utilization", 0.0F);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
index 1643070..63935a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
@@ -40,14 +40,15 @@ trait TimerUtils {
   }
 
   /**
-   * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
+   * A helper method to update the {@link org.apache.samza.metrics.Timer} metrics.
    * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
-   * with no return value. It updates the Timer instance with the duration of
+   * with no return value. It passes one Long parameter to code block that contains
+   * current time in nanoseconds. It updates the Timer instance with the duration of
    * running code block and returns the same duration.
    */
-  def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: => Unit): Long = {
+  def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
     val startingTime = clock()
-    runCodeBlock
+    runCodeBlock(startingTime)
     val duration = clock() - startingTime
     timer.update(duration)
     duration

http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 64a5844..b9d9e73 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -94,8 +94,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
       windowMs = 60000, // call window once per minute
       commitMs = 30000, // call commit twice per minute
       clock = () => {
-        now += 100 // clock advances by 100 ms every time we look at it
-        if (now == 1400000290000L) throw new StopRunLoop // stop after 4 minutes 50 seconds
+        now += 100000000L // clock advances by 100 ms every time we look at it
+        if (now == 1690000000000L) throw new StopRunLoop // stop after 4 minutes 50 seconds
         now
       })
 
@@ -190,26 +190,26 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
       windowMs = 1L,
       commitMs = 1L,
       clock = () => {
-        now += 1L
+        now += 1000000L
         // clock() is called 15 times totally in RunLoop
         // stop the runLoop after one run
-        if (now == 15L) throw new StopRunLoop
+        if (now == 15000000L) throw new StopRunLoop
         now
       })
     intercept[StopRunLoop] { runLoop.run }
 
-    testMetrics.chooseMs.getSnapshot.getAverage should equal(1L)
-    testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
-    testMetrics.processMs.getSnapshot.getAverage should equal(3L)
-    testMetrics.commitMs.getSnapshot.getAverage should equal(0L)
+    testMetrics.chooseNs.getSnapshot.getAverage should equal(1000000L)
+    testMetrics.windowNs.getSnapshot.getAverage should equal(1000000L)
+    testMetrics.processNs.getSnapshot.getAverage should equal(3000000L)
+    testMetrics.commitNs.getSnapshot.getAverage should equal(1000000L)
 
     now = 0L
     intercept[StopRunLoop] { runLoop.run }
     // after two loops
-    testMetrics.chooseMs.getSnapshot.getSize should equal(2)
-    testMetrics.windowMs.getSnapshot.getSize should equal(2)
-    testMetrics.processMs.getSnapshot.getSize should equal(2)
-    testMetrics.commitMs.getSnapshot.getSize should equal(1)
+    testMetrics.chooseNs.getSnapshot.getSize should equal(3)
+    testMetrics.windowNs.getSnapshot.getSize should equal(2)
+    testMetrics.processNs.getSnapshot.getSize should equal(2)
+    testMetrics.commitNs.getSnapshot.getSize should equal(2)
   }
 
   @Test
@@ -224,8 +224,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
       commitMs = 1L,
       windowMs = 1L,
       clock = () => {
-        now += 1L
-        if (now == 13L) throw new StopRunLoop
+        now += 1000000L
+        if (now == 13000000L) throw new StopRunLoop
         now
       }
     )

http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 39c54aa..c0c34bf 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -40,7 +40,7 @@ class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                           getProducer: () => Producer[Array[Byte], Array[Byte]],
                           metrics: KafkaSystemProducerMetrics,
-                          val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils
+                          val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils
 {
   var producer: Producer[Array[Byte], Array[Byte]] = null
   val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]()
@@ -132,7 +132,7 @@ class KafkaSystemProducer(systemName: String,
   }
 
   def flush(source: String) {
-    updateTimer(metrics.flushMs) {
+    updateTimer(metrics.flushNs) {
       metrics.flushes.inc
       //if latestFuture is null, it probably means that there has been no calls to "send" messages
       //Hence, nothing to do in flush

http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
index 8aa73ce..d579e7b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -19,13 +19,7 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.system.SystemStream
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.Partition
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap}
 
 class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   /* Tracks the number of calls made to send in KafkaSystemProducer */
@@ -33,7 +27,7 @@ class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registr
   /* Tracks the number of calls made to flush in KafkaSystemProducer */
   val flushes = newCounter("flushes")
   /* Tracks how long the flush call takes to complete */
-  val flushMs = newTimer("flush-ms")
+  val flushNs = newTimer("flush-ns")
   /* Tracks the number of times the system producer retries a send request (due to RetriableException) */
   val retries = newCounter("producer-retries")
   /* Tracks the number of times flush operation failed */