You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/03/29 22:26:22 UTC

samza git commit: SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…

Repository: samza
Updated Branches:
  refs/heads/master 13ff09024 -> 57fea260a


SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…

…ns are empty.

Author: James Lent <jl...@nc.rr.com>

Reviewers: Yi Pan <ni...@gmail.com>, Xinyu Liu <xi...@linkedin.com>

Closes #436 from jwlent55/SD-1599-improve-async-run-loop-efficiency and squashes the following commits:

ef0e53bb [James Lent] SD-1599: Remove incorrect call to containerMetrics left by previous update.
3899216e [James Lent] SD-1599: Combine the blockIfBusy and blockIfNoWork logic inside one method and a common latch.
e3837809 [James Lent] SD-1599: Mark runLoopResumedSinceLastChecked as volatile.
a7c0ac4c [James Lent] SD-1599: Explicitly set the timeout value to either 'noNewMessagesTimeout' or 0.
b2dd61e2 [James Lent] SD-1599: Address the first set of code inspection comments.
cc34518a [James Lent] SD-1599: Improve the efficiency of the AsynRunLoop when some partitions are empty.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57fea260
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57fea260
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57fea260

Branch: refs/heads/master
Commit: 57fea260a5dad24b287dad113c78bf23aa3ba8f9
Parents: 13ff090
Author: James Lent <jl...@nc.rr.com>
Authored: Thu Mar 29 15:26:28 2018 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Committed: Thu Mar 29 15:26:28 2018 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 12 ++++++
 .../apache/samza/container/RunLoopFactory.java  |  5 +++
 .../org/apache/samza/task/AsyncRunLoop.java     | 42 +++++++++++++++----
 .../org/apache/samza/config/TaskConfig.scala    |  7 ++++
 .../apache/samza/system/SystemConsumers.scala   |  7 +++-
 .../org/apache/samza/task/TestAsyncRunLoop.java | 44 +++++++++++++-------
 6 files changed, 92 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 49886ce..5c41596 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -766,6 +766,18 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-max-idle-ms">task.max.idle.ms</td>
+                    <td class="default">10</td>
+                    <td class="description">
+                        The maximum time to wait for a task worker to complete when there are no new messages to handle before resuming the main
+                        loop and potentially polling for more messages. See <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>
+                        This timeout value prevents the main loop from spinning when there is nothing for it to do.  Increasing this value will reduce
+                        the background load of the thread, but, also potentially increase message latency.  It should not be set greater than the
+                        <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index d399fd0..c9ec6b5 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -92,6 +92,10 @@ public class RunLoopFactory {
 
       log.info("Got callbackTimeout: {}.", callbackTimeout);
 
+      Long maxIdleMs = config.getMaxIdleMs();
+
+      log.info("Got maxIdleMs: {}.", maxIdleMs);
+
       log.info("Run loop in asynchronous mode.");
 
       return new AsyncRunLoop(
@@ -103,6 +107,7 @@ public class RunLoopFactory {
         taskCommitMs,
         callbackTimeout,
         maxThrottlingDelayMs,
+        maxIdleMs,
         containerMetrics,
         clock,
         isAsyncCommitEnabled);

http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 589fbb8..f3c4655 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -67,6 +67,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private final long windowMs;
   private final long commitMs;
   private final long callbackTimeoutMs;
+  private final long maxIdleMs;
   private final SamzaContainerMetrics containerMetrics;
   private final ScheduledExecutorService workerTimer;
   private final ScheduledExecutorService callbackTimer;
@@ -75,6 +76,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private volatile Throwable throwable = null;
   private final HighResolutionClock clock;
   private final boolean isAsyncCommitEnabled;
+  private volatile boolean runLoopResumedSinceLastChecked;
 
   public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances,
       ExecutorService threadPool,
@@ -84,6 +86,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       long commitMs,
       long callbackTimeoutMs,
       long maxThrottlingDelayMs,
+      long maxIdleMs,
       SamzaContainerMetrics containerMetrics,
       HighResolutionClock clock,
       boolean isAsyncCommitEnabled) {
@@ -95,6 +98,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     this.commitMs = commitMs;
     this.maxConcurrency = maxConcurrency;
     this.callbackTimeoutMs = callbackTimeoutMs;
+    this.maxIdleMs = maxIdleMs;
     this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
     this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
     this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
@@ -150,21 +154,21 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         long startNs = clock.nanoTime();
 
         IncomingMessageEnvelope envelope = chooseEnvelope();
-        long chooseNs = clock.nanoTime();
 
+        long chooseNs = clock.nanoTime();
         containerMetrics.chooseNs().update(chooseNs - startNs);
 
-        runTasks(envelope);
+        blockIfBusyOrNoNewWork(envelope);
 
         long blockNs = clock.nanoTime();
+        containerMetrics.blockNs().update(blockNs - chooseNs);
 
-        blockIfBusy(envelope);
+        runTasks(envelope);
 
         long currentNs = clock.nanoTime();
-        long activeNs = blockNs - chooseNs;
+        long activeNs = currentNs - blockNs;
         long totalNs = currentNs - prevNs;
         prevNs = currentNs;
-        containerMetrics.blockNs().update(currentNs - blockNs);
 
         if (totalNs != 0) {
           // totalNs is not 0 if timer metrics are enabled
@@ -233,14 +237,35 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   /**
    * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
    * it will resume the runloop.
+   *
+   * In addition, delay the AsyncRunLoop thread for a short time if there are no new messages to process and the run loop
+   * has not been resumed since the last time this code was run. This will prevent the main thread from spinning when it
+   * has no work to distribute. If a task worker finishes or window/commit completes before the timeout then resume
+   * the AsyncRunLoop thread immediately. That event may allow a task worker to start processing a message that has already
+   * been chosen.  In any event it should only delay for a short time.  It needs to periodically check for new messages.
    */
-  private void blockIfBusy(IncomingMessageEnvelope envelope) {
+  private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope envelope) {
     synchronized (latch) {
+
+      // First check to see if we should delay the run loop for a short time.  The runLoopResumedSinceLastChecked boolean
+      // is used to ensure we don't delay if there may already be a task ready to dequeue a previously chosen/pending
+      // message. It is better to occasionally make one additional loop when there is no work to do then delay the
+      // runloop when there is work that could be started immediately.
+      if ((envelope == null) && !runLoopResumedSinceLastChecked) {
+        try {
+          log.trace("Start no work wait");
+          latch.wait(maxIdleMs);
+          log.trace("End no work wait");
+        } catch (InterruptedException e) {
+          throw new SamzaException("Run loop is interrupted", e);
+        }
+      }
+      runLoopResumedSinceLastChecked = false;
+
+      // Next check to see if we should block if all the tasks are busy.
       while (!shutdownNow && throwable == null) {
         for (AsyncTaskWorker worker : taskWorkers) {
           if (worker.state.isReady()) {
-            // should continue running if any worker state is ready
-            // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop
             return;
           }
         }
@@ -265,6 +290,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     }
     synchronized (latch) {
       latch.notifyAll();
+      runLoopResumedSinceLastChecked = true;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index fe03a52..206eb8f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -44,11 +44,13 @@ object TaskConfig {
   val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
   val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms"  // timeout period for triggering a callback
   val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask
+  val MAX_IDLE_MS = "task.max.idle.ms"  // maximum time to wait for a task worker to complete when there are no new messages to handle
 
   val DEFAULT_WINDOW_MS: Long = -1L
   val DEFAULT_COMMIT_MS = 60000L
   val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
   val DEFAULT_MAX_CONCURRENCY: Int = 1
+  val DEFAULT_MAX_IDLE_MS: Long = 10
 
   /**
    * Samza's container polls for more messages under two conditions. The first
@@ -155,4 +157,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(commitMs) => commitMs.toInt > 0
     case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
   }
+
+  def getMaxIdleMs: Long = getOption(TaskConfig.MAX_IDLE_MS) match {
+    case Some(ms) => ms.toLong
+    case _ => TaskConfig.DEFAULT_MAX_IDLE_MS
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 3964ea3..49ab52a 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -215,8 +215,11 @@ class SystemConsumers (
 
         metrics.choseNull.inc
 
-        // Sleep for a while so we don't poll in a tight loop.
-        timeout = noNewMessagesTimeout
+        // Sleep for a while so we don't poll in a tight loop, but, don't do this when called from the AsyncRunLoop
+        // code because in that case the chooser will not get updated with a new message for an SSP until after a
+        // message is processed, See how updateChooser variable is used below. The AsyncRunLoop has its own way to
+        // block when there is no work to process.
+        timeout = if (updateChooser) noNewMessagesTimeout else 0
       } else {
         val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
 

http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 7f54614..d7132f3 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -69,6 +69,7 @@ public class TestAsyncRunLoop {
   private final long commitMs = -1;
   private final long callbackTimeoutMs = 0;
   private final long maxThrottlingDelayMs = 0;
+  private final long maxIdleMs = 10;
   private final Partition p0 = new Partition(0);
   private final Partition p1 = new Partition(1);
   private final TaskName taskName0 = new TaskName(p0.toString());
@@ -219,7 +220,7 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-        callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+        callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
 
     when(consumerMultiplexer.choose(false))
         .thenReturn(envelope0)
@@ -257,7 +258,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
 
@@ -288,7 +290,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
 
@@ -345,7 +348,8 @@ public class TestAsyncRunLoop {
     task0.callbackHandler = buildOutofOrderCallback(task0);
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
 
@@ -374,7 +378,8 @@ public class TestAsyncRunLoop {
     long windowMs = 1;
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false)).thenReturn(null);
     runLoop.run();
 
@@ -400,7 +405,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     //have a null message in between to make sure task0 finishes processing and invoke the commit
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
@@ -432,7 +438,8 @@ public class TestAsyncRunLoop {
     int maxMessagesInFlight = 1;
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     //have a null message in between to make sure task0 finishes processing and invoke the commit
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
@@ -467,7 +474,8 @@ public class TestAsyncRunLoop {
     int maxMessagesInFlight = 1;
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     // consensus is reached after envelope1 is processed.
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
@@ -500,7 +508,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false))
       .thenReturn(envelope0)
       .thenReturn(envelope1)
@@ -540,7 +549,8 @@ public class TestAsyncRunLoop {
 
     task0.callbackHandler = buildOutofOrderCallback(task0);
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
     when(consumerMultiplexer.choose(false))
         .thenReturn(envelope0)
         .thenReturn(envelope3)
@@ -581,8 +591,9 @@ public class TestAsyncRunLoop {
     tasks.put(taskName1, t1);
     int maxMessagesInFlight = 1;
 
-    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight , windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
 
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
         .thenReturn(envelope1)
@@ -656,7 +667,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumers, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
 
     runLoop.run();
   }
@@ -706,7 +718,8 @@ public class TestAsyncRunLoop {
                                            .thenReturn(null);
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, false);
 
     runLoop.run();
 
@@ -755,7 +768,8 @@ public class TestAsyncRunLoop {
                                            .thenReturn(envelope1)
                                            .thenReturn(null);
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+                                            () -> 0L, true);
 
     runLoop.run();