You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/24 02:52:16 UTC
samza git commit: SAMZA-951 - Improve event loop timing metrics
Repository: samza
Updated Branches:
refs/heads/master 6ae7784a5 -> 9f7abf535
SAMZA-951 - Improve event loop timing metrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f7abf53
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f7abf53
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f7abf53
Branch: refs/heads/master
Commit: 9f7abf535822a1de4d6ac6ee73cb3b879800e4a3
Parents: 6ae7784
Author: Jacob Maes <ja...@gmail.com>
Authored: Mon May 23 19:51:17 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon May 23 19:51:17 2016 -0700
----------------------------------------------------------------------
.../org/apache/samza/container/RunLoop.scala | 10 +--
.../apache/samza/system/SystemConsumers.scala | 73 +++++++++++++-------
.../samza/system/SystemConsumersMetrics.scala | 2 +
.../apache/samza/container/TestRunLoop.scala | 2 +-
.../samza/system/TestSystemConsumers.scala | 12 +++-
5 files changed, 67 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 6916c5c..3f25eca 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -103,11 +103,13 @@ class RunLoop(
trace("Attempting to choose a message to process.")
metrics.processes.inc
- activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
- val envelope = updateTimer(metrics.chooseNs) {
- consumerMultiplexer.choose
- }
+ // Exclude choose time from activeNs. Although it includes deserialization time,
+ // it most closely captures idle time.
+ val envelope = updateTimer(metrics.chooseNs) {
+ consumerMultiplexer.choose
+ }
+ activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
if (envelope != null) {
val ssp = envelope.getSystemStreamPartition
http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 32fc771..2efe836 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -19,9 +19,12 @@
package org.apache.samza.system
+
+import java.util.concurrent.TimeUnit
+
import scala.collection.JavaConversions._
import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, TimerUtils}
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.SamzaException
import java.util.HashMap
@@ -44,7 +47,7 @@ object SystemConsumers {
* messages, poll the MessageChooser for the next message to process, and
* return that message to the SamzaContainer.
*/
-class SystemConsumers(
+class SystemConsumers (
/**
* The class that determines the order to process incoming messages.
@@ -59,12 +62,12 @@ class SystemConsumers(
/**
* The class that handles deserialization of incoming messages.
*/
- serdeManager: SerdeManager = new SerdeManager,
+ serdeManager: SerdeManager,
/**
* A helper class to hold all of SystemConsumers' metrics.
*/
- metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
+ metrics: SystemConsumersMetrics,
/**
* If MessageChooser returns null when it's polled, SystemConsumers will
@@ -73,14 +76,14 @@ class SystemConsumers(
* thread will sit in a tight loop polling every SystemConsumer over and
* over again if no new messages are available.
*/
- noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+ noNewMessagesTimeout: Int,
/**
* This parameter is to define how to deal with deserialization failure. If
* set to true, the task will skip the messages when deserialization fails.
* If set to false, the task will throw SamzaException and fail the container.
*/
- dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+ dropDeserializationError: Boolean,
/**
* <p>Defines an upper bound for how long the SystemConsumers will wait
@@ -96,13 +99,29 @@ class SystemConsumers(
* with no remaining unprocessed messages, the SystemConsumers will poll for
* it within 50ms of its availability in the stream system.</p>
*/
- pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
+ pollIntervalMs: Int,
/**
* Clock can be used to inject a custom clock when mocking this class in
* tests. The default implementation returns the current system clock time.
*/
- clock: () => Long = () => System.currentTimeMillis) extends Logging {
+ val clock: () => Long) extends Logging with TimerUtils {
+
+ def this(chooser: MessageChooser,
+ consumers: Map[String, SystemConsumer],
+ serdeManager: SerdeManager = new SerdeManager,
+ metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
+ noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+ dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+ pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) =
+ this(chooser,
+ consumers,
+ serdeManager,
+ metrics,
+ noNewMessagesTimeout,
+ dropDeserializationError,
+ pollIntervalMs,
+ () => System.nanoTime())
/**
* A buffer of incoming messages grouped by SystemStreamPartition. These
@@ -128,7 +147,7 @@ class SystemConsumers(
/**
* The last time that systems were polled for new messages.
*/
- var lastPollMs = 0L
+ var lastPollNs = 0L
/**
* Total number of unprocessed messages in unprocessedMessagesBySSP.
@@ -187,28 +206,32 @@ class SystemConsumers(
def choose: IncomingMessageEnvelope = {
val envelopeFromChooser = chooser.choose
- if (envelopeFromChooser == null) {
- trace("Chooser returned null.")
+ updateTimer(metrics.deserializationNs) {
+ if (envelopeFromChooser == null) {
+ trace("Chooser returned null.")
- metrics.choseNull.inc
+ metrics.choseNull.inc
- // Sleep for a while so we don't poll in a tight loop.
- timeout = noNewMessagesTimeout
- } else {
- val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
+ // Sleep for a while so we don't poll in a tight loop.
+ timeout = noNewMessagesTimeout
+ } else {
+ val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
- trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
+ trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
- // Ok to give the chooser a new message from this stream.
- timeout = 0
- metrics.choseObject.inc
- metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
+ // Ok to give the chooser a new message from this stream.
+ timeout = 0
+ metrics.choseObject.inc
+ metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
- tryUpdate(systemStreamPartition)
+ tryUpdate(systemStreamPartition)
+ }
}
- if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
- refresh
+ updateTimer(metrics.pollNs) {
+ if (envelopeFromChooser == null || TimeUnit.NANOSECONDS.toMillis(clock() - lastPollNs) > pollIntervalMs) {
+ refresh
+ }
}
envelopeFromChooser
@@ -280,7 +303,7 @@ class SystemConsumers(
trace("Refreshing chooser with new messages.")
// Update last poll time so we don't poll too frequently.
- lastPollMs = clock()
+ lastPollNs = clock()
// Poll every system for new messages.
consumers.keys.map(poll(_))
http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index e7f012f..43d381b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -32,6 +32,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStreamPartition, Counter]()
+ val pollNs = newTimer("poll-ns")
+ val deserializationNs = newTimer("deserialization-ns")
def setNeededByChooser(getValue: () => Int) {
newGauge("ssps-needed-by-chooser", getValue)
http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ad37447..e280daa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -217,7 +217,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
testMetrics.chooseNs.getSnapshot.getAverage should equal(1000000L)
testMetrics.windowNs.getSnapshot.getAverage should equal(1000000L)
- testMetrics.processNs.getSnapshot.getAverage should equal(3000000L)
+ testMetrics.processNs.getSnapshot.getAverage should equal(1000000L)
testMetrics.commitNs.getSnapshot.getAverage should equal(1000000L)
now = 0L
http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index fbaa8ee..09da62e 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -39,7 +39,11 @@ class TestSystemConsumers {
val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
val consumer = new CustomPollResponseSystemConsumer(envelope)
var now = 0L
- val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+ val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+ new SerdeManager, new SystemConsumersMetrics,
+ SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+ SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+ SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
consumers.register(systemStreamPartition0, "0")
consumers.register(systemStreamPartition1, "1234")
@@ -97,7 +101,11 @@ class TestSystemConsumers {
val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
val consumer = new CustomPollResponseSystemConsumer(envelope)
var now = 0
- val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+ val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+ new SerdeManager, new SystemConsumersMetrics,
+ SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+ SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+ SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
consumers.register(systemStreamPartition, "0")
consumers.start