You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/04 03:50:18 UTC

samza git commit: SAMZA-401: added utilization metrics for the event loop

Repository: samza
Updated Branches:
  refs/heads/master f320a4306 -> 69b1f2e56


SAMZA-401: added utilization metrics for the event loop


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69b1f2e5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69b1f2e5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69b1f2e5

Branch: refs/heads/master
Commit: 69b1f2e56d14276849be5168953e4fd638b42830
Parents: f320a43
Author: Luis De Pombo <lf...@gmail.com>
Authored: Wed Jun 3 18:50:36 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jun 3 18:50:36 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/container/RunLoop.scala    | 11 ++++++++---
 .../samza/container/SamzaContainerMetrics.scala       |  1 +
 .../main/scala/org/apache/samza/util/TimerUtils.scala | 14 ++++++++++++++
 .../org/apache/samza/container/TestRunLoop.scala      |  4 ++--
 4 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 4c0faf6..e0c3e7e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -44,6 +44,7 @@ class RunLoop(
 
   private var lastWindowMs = 0L
   private var lastCommitMs = 0L
+  private var activeMs = 0L
   private var taskShutdownRequests: Set[TaskName] = Set()
   private var taskCommitRequests: Set[TaskName] = Set()
   @volatile private var shutdownNow = false
@@ -66,9 +67,13 @@ class RunLoop(
     addShutdownHook(Thread.currentThread())
 
     while (!shutdownNow) {
+      val loopStartTime = clock();
       process
       window
       commit
+      val totalMs = clock() - loopStartTime
+      metrics.utilization.set(activeMs.toFloat/totalMs)
+      activeMs = 0L
     }
   }
 
@@ -95,7 +100,7 @@ class RunLoop(
     trace("Attempting to choose a message to process.")
     metrics.processes.inc
 
-    updateTimer(metrics.processMs) {
+    activeMs += updateTimerAndGetDuration(metrics.processMs) {
       val envelope = updateTimer(metrics.chooseMs) {
         consumerMultiplexer.choose
       }
@@ -122,7 +127,7 @@ class RunLoop(
    * Invokes WindowableTask.window on all tasks if it's time to do so.
    */
   private def window {
-    updateTimer(metrics.windowMs) {
+    activeMs += updateTimerAndGetDuration(metrics.windowMs) {
       if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
         trace("Windowing stream tasks.")
         lastWindowMs = clock()
@@ -142,7 +147,7 @@ class RunLoop(
    * Commits task state as a a checkpoint, if necessary.
    */
   private def commit {
-    updateTimer(metrics.commitMs) {
+    activeMs += updateTimerAndGetDuration(metrics.commitMs) {
       if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
         trace("Committing task instances because the commit interval has elapsed.")
         lastCommitMs = clock()

http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 7d9ff00..aa7a9bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,4 +37,5 @@ class SamzaContainerMetrics(
   val windowMs = newTimer("window-ms")
   val processMs = newTimer("process-ms")
   val commitMs = newTimer("commit-ms")
+  val utilization = newGauge("event-loop-utilization", 0.0F);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
index 6fedb62..1643070 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
@@ -38,4 +38,18 @@ trait TimerUtils {
     timer.update(clock() - startingTime)
     returnValue
   }
+
+  /**
+   * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
+   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
+   * with no return value. It updates the Timer instance with the duration of
+   * running code block and returns the same duration.
+   */
+  def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: => Unit): Long = {
+    val startingTime = clock()
+    runCodeBlock
+    val duration = clock() - startingTime
+    timer.update(duration)
+    duration
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 73ec2b5..cec6477 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -191,9 +191,9 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
       commitMs = 1L,
       clock = () => {
         now += 1L
-        // clock() is called 13 times totally in RunLoop
+        // clock() is called 15 times totally in RunLoop
         // stop the runLoop after one run
-        if (now == 13L) throw new StopRunLoop
+        if (now == 15L) throw new StopRunLoop
         now
       })
     intercept[StopRunLoop] { runLoop.run }