You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/08/18 09:45:51 UTC
samza git commit: SAMZA-738: Samza timer based metrics does not have
enough precision
Repository: samza
Updated Branches:
refs/heads/master 6c4ecb532 -> 3a4886a65
SAMZA-738: Samza timer based metrics does not have enough precision
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a4886a6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a4886a6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a4886a6
Branch: refs/heads/master
Commit: 3a4886a6531530de67b7cab6ed004653e9e6aa38
Parents: 6c4ecb5
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Tue Aug 18 00:45:34 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Aug 18 00:45:34 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/container/RunLoop.scala | 44 ++++++++++----------
.../samza/container/SamzaContainerMetrics.scala | 8 ++--
.../org/apache/samza/util/TimerUtils.scala | 9 ++--
.../apache/samza/container/TestRunLoop.scala | 28 ++++++-------
.../system/kafka/KafkaSystemProducer.scala | 4 +-
.../kafka/KafkaSystemProducerMetrics.scala | 10 +----
6 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index c292ae4..24da35f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -19,10 +19,9 @@
package org.apache.samza.container
-import org.apache.samza.util.Logging
-import org.apache.samza.system.{ SystemStreamPartition, SystemConsumers }
+import org.apache.samza.system.{SystemConsumers, SystemStreamPartition}
import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.{Logging, TimerUtils}
/**
* Each {@link SamzaContainer} uses a single-threaded execution model: activities for
@@ -39,12 +38,13 @@ class RunLoop(
val metrics: SamzaContainerMetrics,
val windowMs: Long = -1,
val commitMs: Long = 60000,
- val clock: () => Long = { System.currentTimeMillis },
+ val clock: () => Long = { System.nanoTime },
val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
- private var lastWindowMs = clock()
- private var lastCommitMs = clock()
- private var activeMs = 0L
+ private val metricsMsOffset = 1000000L
+ private var lastWindowNs = clock()
+ private var lastCommitNs = clock()
+ private var activeNs = 0L
private var taskShutdownRequests: Set[TaskName] = Set()
private var taskCommitRequests: Set[TaskName] = Set()
@volatile private var shutdownNow = false
@@ -67,13 +67,13 @@ class RunLoop(
addShutdownHook(Thread.currentThread())
while (!shutdownNow) {
- val loopStartTime = clock();
+ val loopStartTime = clock()
process
window
commit
- val totalMs = clock() - loopStartTime
- metrics.utilization.set(activeMs.toFloat/totalMs)
- activeMs = 0L
+ val totalNs = clock() - loopStartTime
+ metrics.utilization.set(activeNs.toFloat/totalNs)
+ activeNs = 0L
}
}
@@ -100,8 +100,8 @@ class RunLoop(
trace("Attempting to choose a message to process.")
metrics.processes.inc
- activeMs += updateTimerAndGetDuration(metrics.processMs) {
- val envelope = updateTimer(metrics.chooseMs) {
+ activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
+ val envelope = updateTimer(metrics.chooseNs) {
consumerMultiplexer.choose
}
@@ -120,17 +120,17 @@ class RunLoop(
trace("No incoming message envelope was available.")
metrics.nullEnvelopes.inc
}
- }
+ })
}
/**
* Invokes WindowableTask.window on all tasks if it's time to do so.
*/
private def window {
- activeMs += updateTimerAndGetDuration(metrics.windowMs) {
- if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+ activeNs += updateTimerAndGetDuration(metrics.windowNs) ((currentTimeNs: Long) => {
+ if (windowMs >= 0 && lastWindowNs + windowMs * metricsMsOffset < currentTimeNs) {
trace("Windowing stream tasks.")
- lastWindowMs = clock()
+ lastWindowNs = currentTimeNs
metrics.windows.inc
taskInstances.foreach {
@@ -140,17 +140,17 @@ class RunLoop(
checkCoordinator(coordinator)
}
}
- }
+ })
}
/**
* Commits task state as a a checkpoint, if necessary.
*/
private def commit {
- activeMs += updateTimerAndGetDuration(metrics.commitMs) {
- if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
+ activeNs += updateTimerAndGetDuration(metrics.commitNs) ((currentTimeNs: Long) => {
+ if (commitMs >= 0 && lastCommitNs + commitMs * metricsMsOffset < currentTimeNs) {
trace("Committing task instances because the commit interval has elapsed.")
- lastCommitMs = clock()
+ lastCommitNs = currentTimeNs
metrics.commits.inc
taskInstances.values.foreach(_.commit)
} else if (!taskCommitRequests.isEmpty) {
@@ -162,7 +162,7 @@ class RunLoop(
}
taskCommitRequests = Set()
- }
+ })
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index aa7a9bc..127f3a1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -33,9 +33,9 @@ class SamzaContainerMetrics(
val sends = newCounter("send-calls")
val envelopes = newCounter("process-envelopes")
val nullEnvelopes = newCounter("process-null-envelopes")
- val chooseMs = newTimer("choose-ms")
- val windowMs = newTimer("window-ms")
- val processMs = newTimer("process-ms")
- val commitMs = newTimer("commit-ms")
+ val chooseNs = newTimer("choose-ns")
+ val windowNs = newTimer("window-ns")
+ val processNs = newTimer("process-ns")
+ val commitNs = newTimer("commit-ns")
val utilization = newGauge("event-loop-utilization", 0.0F);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
index 1643070..63935a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
@@ -40,14 +40,15 @@ trait TimerUtils {
}
/**
- * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
+ * A helper method to update the {@link org.apache.samza.metrics.Timer} metrics.
* It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
- * with no return value. It updates the Timer instance with the duration of
+ * with no return value. It passes one Long parameter to code block that contains
+ * current time in nanoseconds. It updates the Timer instance with the duration of
* running code block and returns the same duration.
*/
- def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: => Unit): Long = {
+ def updateTimerAndGetDuration(timer: Timer)(runCodeBlock: Long => Unit): Long = {
val startingTime = clock()
- runCodeBlock
+ runCodeBlock(startingTime)
val duration = clock() - startingTime
timer.update(duration)
duration
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 64a5844..b9d9e73 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -94,8 +94,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
windowMs = 60000, // call window once per minute
commitMs = 30000, // call commit twice per minute
clock = () => {
- now += 100 // clock advances by 100 ms every time we look at it
- if (now == 1400000290000L) throw new StopRunLoop // stop after 4 minutes 50 seconds
+ now += 100000000L // clock advances by 100 ms every time we look at it
+ if (now == 1690000000000L) throw new StopRunLoop // stop after 4 minutes 50 seconds
now
})
@@ -190,26 +190,26 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
windowMs = 1L,
commitMs = 1L,
clock = () => {
- now += 1L
+ now += 1000000L
// clock() is called 15 times totally in RunLoop
// stop the runLoop after one run
- if (now == 15L) throw new StopRunLoop
+ if (now == 15000000L) throw new StopRunLoop
now
})
intercept[StopRunLoop] { runLoop.run }
- testMetrics.chooseMs.getSnapshot.getAverage should equal(1L)
- testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
- testMetrics.processMs.getSnapshot.getAverage should equal(3L)
- testMetrics.commitMs.getSnapshot.getAverage should equal(0L)
+ testMetrics.chooseNs.getSnapshot.getAverage should equal(1000000L)
+ testMetrics.windowNs.getSnapshot.getAverage should equal(1000000L)
+ testMetrics.processNs.getSnapshot.getAverage should equal(3000000L)
+ testMetrics.commitNs.getSnapshot.getAverage should equal(1000000L)
now = 0L
intercept[StopRunLoop] { runLoop.run }
// after two loops
- testMetrics.chooseMs.getSnapshot.getSize should equal(2)
- testMetrics.windowMs.getSnapshot.getSize should equal(2)
- testMetrics.processMs.getSnapshot.getSize should equal(2)
- testMetrics.commitMs.getSnapshot.getSize should equal(1)
+ testMetrics.chooseNs.getSnapshot.getSize should equal(3)
+ testMetrics.windowNs.getSnapshot.getSize should equal(2)
+ testMetrics.processNs.getSnapshot.getSize should equal(2)
+ testMetrics.commitNs.getSnapshot.getSize should equal(2)
}
@Test
@@ -224,8 +224,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
commitMs = 1L,
windowMs = 1L,
clock = () => {
- now += 1L
- if (now == 13L) throw new StopRunLoop
+ now += 1000000L
+ if (now == 13000000L) throw new StopRunLoop
now
}
)
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 39c54aa..c0c34bf 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -40,7 +40,7 @@ class KafkaSystemProducer(systemName: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
getProducer: () => Producer[Array[Byte], Array[Byte]],
metrics: KafkaSystemProducerMetrics,
- val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils
+ val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils
{
var producer: Producer[Array[Byte], Array[Byte]] = null
val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]()
@@ -132,7 +132,7 @@ class KafkaSystemProducer(systemName: String,
}
def flush(source: String) {
- updateTimer(metrics.flushMs) {
+ updateTimer(metrics.flushNs) {
metrics.flushes.inc
//if latestFuture is null, it probably means that there has been no calls to "send" messages
//Hence, nothing to do in flush
http://git-wip-us.apache.org/repos/asf/samza/blob/3a4886a6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
index 8aa73ce..d579e7b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -19,13 +19,7 @@
package org.apache.samza.system.kafka
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.system.SystemStream
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.Partition
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.{MetricsHelper, MetricsRegistry, MetricsRegistryMap}
class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
/* Tracks the number of calls made to send in KafkaSystemProducer */
@@ -33,7 +27,7 @@ class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registr
/* Tracks the number of calls made to flush in KafkaSystemProducer */
val flushes = newCounter("flushes")
/* Tracks how long the flush call takes to complete */
- val flushMs = newTimer("flush-ms")
+ val flushNs = newTimer("flush-ns")
/* Tracks the number of times the system producer retries a send request (due to RetriableException) */
val retries = newCounter("producer-retries")
/* Tracks the number of times flush operation failed */