You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/03/29 22:26:22 UTC
samza git commit: SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…
Repository: samza
Updated Branches:
refs/heads/master 13ff09024 -> 57fea260a
SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…
…ns are empty.
Author: James Lent <jl...@nc.rr.com>
Reviewers: Yi Pan <ni...@gmail.com>, Xinyu Liu <xi...@linkedin.com>
Closes #436 from jwlent55/SD-1599-improve-async-run-loop-efficiency and squashes the following commits:
ef0e53bb [James Lent] SD-1599: Remove incorrect call to containerMetrics left by previous update.
3899216e [James Lent] SD-1599: Combine the blockIfBusy and blockIfNoWork logic inside one method and a common latch.
e3837809 [James Lent] SD-1599: Mark runLoopResumedSinceLastChecked as volatile.
a7c0ac4c [James Lent] SD-1599: Explicitly set the timeout value to either 'noNewMessagesTimeout' or 0.
b2dd61e2 [James Lent] SD-1599: Address the first set of code inspection comments.
cc34518a [James Lent] SD-1599: Improve the efficiency of the AsynRunLoop when some partitions are empty.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57fea260
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57fea260
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57fea260
Branch: refs/heads/master
Commit: 57fea260a5dad24b287dad113c78bf23aa3ba8f9
Parents: 13ff090
Author: James Lent <jl...@nc.rr.com>
Authored: Thu Mar 29 15:26:28 2018 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Committed: Thu Mar 29 15:26:28 2018 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 12 ++++++
.../apache/samza/container/RunLoopFactory.java | 5 +++
.../org/apache/samza/task/AsyncRunLoop.java | 42 +++++++++++++++----
.../org/apache/samza/config/TaskConfig.scala | 7 ++++
.../apache/samza/system/SystemConsumers.scala | 7 +++-
.../org/apache/samza/task/TestAsyncRunLoop.java | 44 +++++++++++++-------
6 files changed, 92 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 49886ce..5c41596 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -766,6 +766,18 @@
</tr>
<tr>
+ <td class="property" id="task-max-idle-ms">task.max.idle.ms</td>
+ <td class="default">10</td>
+ <td class="description">
+ The maximum time to wait for a task worker to complete when there are no new messages to handle before resuming the main
+ loop and potentially polling for more messages. See <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>
+ This timeout value prevents the main loop from spinning when there is nothing for it to do. Increasing this value will reduce
+ the background load of the thread, but, also potentially increase message latency. It should not be set greater than the
+ <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td>
<td class="default"></td>
<td class="description">
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index d399fd0..c9ec6b5 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -92,6 +92,10 @@ public class RunLoopFactory {
log.info("Got callbackTimeout: {}.", callbackTimeout);
+ Long maxIdleMs = config.getMaxIdleMs();
+
+ log.info("Got maxIdleMs: {}.", maxIdleMs);
+
log.info("Run loop in asynchronous mode.");
return new AsyncRunLoop(
@@ -103,6 +107,7 @@ public class RunLoopFactory {
taskCommitMs,
callbackTimeout,
maxThrottlingDelayMs,
+ maxIdleMs,
containerMetrics,
clock,
isAsyncCommitEnabled);
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 589fbb8..f3c4655 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -67,6 +67,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private final long windowMs;
private final long commitMs;
private final long callbackTimeoutMs;
+ private final long maxIdleMs;
private final SamzaContainerMetrics containerMetrics;
private final ScheduledExecutorService workerTimer;
private final ScheduledExecutorService callbackTimer;
@@ -75,6 +76,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private volatile Throwable throwable = null;
private final HighResolutionClock clock;
private final boolean isAsyncCommitEnabled;
+ private volatile boolean runLoopResumedSinceLastChecked;
public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances,
ExecutorService threadPool,
@@ -84,6 +86,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
long commitMs,
long callbackTimeoutMs,
long maxThrottlingDelayMs,
+ long maxIdleMs,
SamzaContainerMetrics containerMetrics,
HighResolutionClock clock,
boolean isAsyncCommitEnabled) {
@@ -95,6 +98,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
this.commitMs = commitMs;
this.maxConcurrency = maxConcurrency;
this.callbackTimeoutMs = callbackTimeoutMs;
+ this.maxIdleMs = maxIdleMs;
this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
@@ -150,21 +154,21 @@ public class AsyncRunLoop implements Runnable, Throttleable {
long startNs = clock.nanoTime();
IncomingMessageEnvelope envelope = chooseEnvelope();
- long chooseNs = clock.nanoTime();
+ long chooseNs = clock.nanoTime();
containerMetrics.chooseNs().update(chooseNs - startNs);
- runTasks(envelope);
+ blockIfBusyOrNoNewWork(envelope);
long blockNs = clock.nanoTime();
+ containerMetrics.blockNs().update(blockNs - chooseNs);
- blockIfBusy(envelope);
+ runTasks(envelope);
long currentNs = clock.nanoTime();
- long activeNs = blockNs - chooseNs;
+ long activeNs = currentNs - blockNs;
long totalNs = currentNs - prevNs;
prevNs = currentNs;
- containerMetrics.blockNs().update(currentNs - blockNs);
if (totalNs != 0) {
// totalNs is not 0 if timer metrics are enabled
@@ -233,14 +237,35 @@ public class AsyncRunLoop implements Runnable, Throttleable {
/**
* Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
* it will resume the runloop.
+ *
+ * In addition, delay the AsyncRunLoop thread for a short time if there are no new messages to process and the run loop
+ * has not been resumed since the last time this code was run. This will prevent the main thread from spinning when it
+ * has no work to distribute. If a task worker finishes or window/commit completes before the timeout then resume
+ * the AsyncRunLoop thread immediately. That event may allow a task worker to start processing a message that has already
+ * been chosen. In any event it should only delay for a short time. It needs to periodically check for new messages.
*/
- private void blockIfBusy(IncomingMessageEnvelope envelope) {
+ private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope envelope) {
synchronized (latch) {
+
+ // First check to see if we should delay the run loop for a short time. The runLoopResumedSinceLastChecked boolean
+ // is used to ensure we don't delay if there may already be a task ready to dequeue a previously chosen/pending
+ // message. It is better to occasionally make one additional loop when there is no work to do then delay the
+ // runloop when there is work that could be started immediately.
+ if ((envelope == null) && !runLoopResumedSinceLastChecked) {
+ try {
+ log.trace("Start no work wait");
+ latch.wait(maxIdleMs);
+ log.trace("End no work wait");
+ } catch (InterruptedException e) {
+ throw new SamzaException("Run loop is interrupted", e);
+ }
+ }
+ runLoopResumedSinceLastChecked = false;
+
+ // Next check to see if we should block if all the tasks are busy.
while (!shutdownNow && throwable == null) {
for (AsyncTaskWorker worker : taskWorkers) {
if (worker.state.isReady()) {
- // should continue running if any worker state is ready
- // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop
return;
}
}
@@ -265,6 +290,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
}
synchronized (latch) {
latch.notifyAll();
+ runLoopResumedSinceLastChecked = true;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index fe03a52..206eb8f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -44,11 +44,13 @@ object TaskConfig {
val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback
val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask
+ val MAX_IDLE_MS = "task.max.idle.ms" // maximum time to wait for a task worker to complete when there are no new messages to handle
val DEFAULT_WINDOW_MS: Long = -1L
val DEFAULT_COMMIT_MS = 60000L
val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
val DEFAULT_MAX_CONCURRENCY: Int = 1
+ val DEFAULT_MAX_IDLE_MS: Long = 10
/**
* Samza's container polls for more messages under two conditions. The first
@@ -155,4 +157,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(commitMs) => commitMs.toInt > 0
case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
}
+
+ def getMaxIdleMs: Long = getOption(TaskConfig.MAX_IDLE_MS) match {
+ case Some(ms) => ms.toLong
+ case _ => TaskConfig.DEFAULT_MAX_IDLE_MS
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 3964ea3..49ab52a 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -215,8 +215,11 @@ class SystemConsumers (
metrics.choseNull.inc
- // Sleep for a while so we don't poll in a tight loop.
- timeout = noNewMessagesTimeout
+ // Sleep for a while so we don't poll in a tight loop, but, don't do this when called from the AsyncRunLoop
+ // code because in that case the chooser will not get updated with a new message for an SSP until after a
+ // message is processed, See how updateChooser variable is used below. The AsyncRunLoop has its own way to
+ // block when there is no work to process.
+ timeout = if (updateChooser) noNewMessagesTimeout else 0
} else {
val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 7f54614..d7132f3 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -69,6 +69,7 @@ public class TestAsyncRunLoop {
private final long commitMs = -1;
private final long callbackTimeoutMs = 0;
private final long maxThrottlingDelayMs = 0;
+ private final long maxIdleMs = 10;
private final Partition p0 = new Partition(0);
private final Partition p1 = new Partition(1);
private final TaskName taskName0 = new TaskName(p0.toString());
@@ -219,7 +220,7 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false))
.thenReturn(envelope0)
@@ -257,7 +258,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -288,7 +290,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -345,7 +348,8 @@ public class TestAsyncRunLoop {
task0.callbackHandler = buildOutofOrderCallback(task0);
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -374,7 +378,8 @@ public class TestAsyncRunLoop {
long windowMs = 1;
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(null);
runLoop.run();
@@ -400,7 +405,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
//have a null message in between to make sure task0 finishes processing and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -432,7 +438,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
//have a null message in between to make sure task0 finishes processing and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -467,7 +474,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
// consensus is reached after envelope1 is processed.
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null);
runLoop.run();
@@ -500,7 +508,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false))
.thenReturn(envelope0)
.thenReturn(envelope1)
@@ -540,7 +549,8 @@ public class TestAsyncRunLoop {
task0.callbackHandler = buildOutofOrderCallback(task0);
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false))
.thenReturn(envelope0)
.thenReturn(envelope3)
@@ -581,8 +591,9 @@ public class TestAsyncRunLoop {
tasks.put(taskName1, t1);
int maxMessagesInFlight = 1;
- AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight , windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
.thenReturn(envelope1)
@@ -656,7 +667,8 @@ public class TestAsyncRunLoop {
int maxMessagesInFlight = 1;
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumers, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
runLoop.run();
}
@@ -706,7 +718,8 @@ public class TestAsyncRunLoop {
.thenReturn(null);
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
runLoop.run();
@@ -755,7 +768,8 @@ public class TestAsyncRunLoop {
.thenReturn(envelope1)
.thenReturn(null);
AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, true);
runLoop.run();