You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/04 03:50:18 UTC
samza git commit: SAMZA-401: added utilization metrics for the event
loop
Repository: samza
Updated Branches:
refs/heads/master f320a4306 -> 69b1f2e56
SAMZA-401: added utilization metrics for the event loop
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69b1f2e5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69b1f2e5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69b1f2e5
Branch: refs/heads/master
Commit: 69b1f2e56d14276849be5168953e4fd638b42830
Parents: f320a43
Author: Luis De Pombo <lf...@gmail.com>
Authored: Wed Jun 3 18:50:36 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jun 3 18:50:36 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/samza/container/RunLoop.scala | 11 ++++++++---
.../samza/container/SamzaContainerMetrics.scala | 1 +
.../main/scala/org/apache/samza/util/TimerUtils.scala | 14 ++++++++++++++
.../org/apache/samza/container/TestRunLoop.scala | 4 ++--
4 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 4c0faf6..e0c3e7e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -44,6 +44,7 @@ class RunLoop(
private var lastWindowMs = 0L
private var lastCommitMs = 0L
+ private var activeMs = 0L
private var taskShutdownRequests: Set[TaskName] = Set()
private var taskCommitRequests: Set[TaskName] = Set()
@volatile private var shutdownNow = false
@@ -66,9 +67,13 @@ class RunLoop(
addShutdownHook(Thread.currentThread())
while (!shutdownNow) {
+ val loopStartTime = clock();
process
window
commit
+ val totalMs = clock() - loopStartTime
+ metrics.utilization.set(activeMs.toFloat/totalMs)
+ activeMs = 0L
}
}
@@ -95,7 +100,7 @@ class RunLoop(
trace("Attempting to choose a message to process.")
metrics.processes.inc
- updateTimer(metrics.processMs) {
+ activeMs += updateTimerAndGetDuration(metrics.processMs) {
val envelope = updateTimer(metrics.chooseMs) {
consumerMultiplexer.choose
}
@@ -122,7 +127,7 @@ class RunLoop(
* Invokes WindowableTask.window on all tasks if it's time to do so.
*/
private def window {
- updateTimer(metrics.windowMs) {
+ activeMs += updateTimerAndGetDuration(metrics.windowMs) {
if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
trace("Windowing stream tasks.")
lastWindowMs = clock()
@@ -142,7 +147,7 @@ class RunLoop(
* Commits task state as a a checkpoint, if necessary.
*/
private def commit {
- updateTimer(metrics.commitMs) {
+ activeMs += updateTimerAndGetDuration(metrics.commitMs) {
if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
trace("Committing task instances because the commit interval has elapsed.")
lastCommitMs = clock()
http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 7d9ff00..aa7a9bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,4 +37,5 @@ class SamzaContainerMetrics(
val windowMs = newTimer("window-ms")
val processMs = newTimer("process-ms")
val commitMs = newTimer("commit-ms")
+ val utilization = newGauge("event-loop-utilization", 0.0F);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
index 6fedb62..1643070 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
@@ -38,4 +38,18 @@ trait TimerUtils {
timer.update(clock() - startingTime)
returnValue
}
+
+ /**
+ * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
+ * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
+ * with no return value. It updates the Timer instance with the duration of
+ * running code block and returns the same duration.
+ */
+ def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: => Unit): Long = {
+ val startingTime = clock()
+ runCodeBlock
+ val duration = clock() - startingTime
+ timer.update(duration)
+ duration
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/69b1f2e5/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 73ec2b5..cec6477 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -191,9 +191,9 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
commitMs = 1L,
clock = () => {
now += 1L
- // clock() is called 13 times totally in RunLoop
+ // clock() is called 15 times totally in RunLoop
// stop the runLoop after one run
- if (now == 13L) throw new StopRunLoop
+ if (now == 15L) throw new StopRunLoop
now
})
intercept[StopRunLoop] { runLoop.run }