You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/17 18:46:46 UTC

samza git commit: SAMZA-1140; Non blocking commit in Async Runloop

Repository: samza
Updated Branches:
  refs/heads/master 779c3d80e -> a080ca21b


SAMZA-1140; Non blocking commit in Async Runloop

Adds non blocking commit into AsyncRunLoop. Clients can enable it by setting an opt-in config task.async.commit (which is disabled by default). Contains the following list of changes for AsyncStreamTask type of tasks.

a) Enable commit for a task,  when there're uncompleted callbacks from a task. Progressive commit of finished callbacks, when there are messages in flight for a task.
b) Enable processing of messages for a task, when there're commits in progress from a task.

Documentation for this change will be done in a separate PR.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #85 from shanthoosh/asyncCommitSupport


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a080ca21
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a080ca21
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a080ca21

Branch: refs/heads/master
Commit: a080ca21ba24c476fe2136134db93eadc5748a90
Parents: 779c3d8
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Mar 17 11:46:34 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri Mar 17 11:46:34 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/container/RunLoopFactory.java  |   5 +-
 .../org/apache/samza/task/AsyncRunLoop.java     |  53 +++++--
 .../org/apache/samza/config/TaskConfig.scala    |   6 +
 .../org/apache/samza/task/TestAsyncRunLoop.java | 137 +++++++++++++++++--
 4 files changed, 178 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..05e9979 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -87,6 +87,8 @@ public class RunLoopFactory {
     } else {
       Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
 
+      boolean isAsyncCommitEnabled = config.getAsyncCommit().getOrElse(defaultValue(false));
+
       log.info("Got max messages in flight: " + taskMaxConcurrency);
 
       Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
@@ -105,7 +107,8 @@ public class RunLoopFactory {
         callbackTimeout,
         maxThrottlingDelayMs,
         containerMetrics,
-        clock);
+        clock,
+        isAsyncCommitEnabled);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 1561bcf..8e375f1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -74,6 +74,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private volatile boolean shutdownNow = false;
   private volatile Throwable throwable = null;
   private final HighResolutionClock clock;
+  private final boolean isAsyncCommitEnabled;
 
   public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances,
       ExecutorService threadPool,
@@ -84,7 +85,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       long callbackTimeoutMs,
       long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics,
-      HighResolutionClock clock) {
+      HighResolutionClock clock,
+      boolean isAsyncCommitEnabled) {
 
     this.threadPool = threadPool;
     this.consumerMultiplexer = consumerMultiplexer;
@@ -106,6 +108,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     // Partions and tasks assigned to the container will not change during the run loop life time
     this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, workers));
     this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values()));
+    this.isAsyncCommitEnabled = isAsyncCommitEnabled;
   }
 
   /**
@@ -442,7 +445,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             containerMetrics.windowNs().update(clock.nanoTime() - startTime);
             coordinatorRequests.update(coordinator);
 
-            state.doneWindowOrCommit();
+            state.doneWindow();
           } catch (Throwable t) {
             log.error("Task {} window failed", task.taskName(), t);
             abort(t);
@@ -477,7 +480,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             task.commit();
             containerMetrics.commitNs().update(clock.nanoTime() - startTime);
 
-            state.doneWindowOrCommit();
+            state.doneCommit();
           } catch (Throwable t) {
             log.error("Task {} commit failed", task.taskName(), t);
             abort(t);
@@ -569,7 +572,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     private volatile boolean needCommit = false;
     private volatile boolean complete = false;
     private volatile boolean endOfStream = false;
-    private volatile boolean windowOrCommitInFlight = false;
+    private volatile boolean windowInFlight = false;
+    private volatile boolean commitInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
 
@@ -601,6 +605,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
 
     /**
      * Returns whether the task is ready to do process/window/commit.
+     *
      */
     private boolean isReady() {
       if (checkEndOfStream()) {
@@ -609,14 +614,30 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       if (coordinatorRequests.commitRequests().remove(taskName)) {
         needCommit = true;
       }
-      if (needWindow || needCommit || endOfStream) {
-        // ready for window or commit only when no messages are in progress and
-        // no window/commit in flight
+
+      boolean windowOrCommitInFlight = windowInFlight || commitInFlight;
+      /*
+       * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
+       * and either of the following conditions are true.
+       * a) When process, window, commit are not in progress.
+       * b) When task.async.commit is true and window, commit are not in progress.
+       */
+      if (needCommit) {
+        return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !windowOrCommitInFlight;
+      } else if (needWindow || endOfStream) {
+        /*
+         * A task is ready for window operation, when task.window(needWindow) is requested by either user or window thread
+         * and window, commit are not in progress.
+         */
         return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
       } else {
-        // ready for process only when the inflight message count does not exceed threshold
-        // and no window/commit in flight
-        return messagesInFlight.get() < maxConcurrency && !windowOrCommitInFlight;
+        /*
+         * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
+         * and either of the following conditions are true.
+         * a) When window, commit are not in progress.
+         * b) When task.async.commit is true and window is not in progress.
+         */
+        return messagesInFlight.get() < maxConcurrency && !windowInFlight && (isAsyncCommitEnabled || !commitInFlight);
       }
     }
 
@@ -646,12 +667,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
 
     private void startWindow() {
       needWindow = false;
-      windowOrCommitInFlight = true;
+      windowInFlight = true;
     }
 
     private void startCommit() {
       needCommit = false;
-      windowOrCommitInFlight = true;
+      commitInFlight = true;
     }
 
     private void startProcess() {
@@ -659,8 +680,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
-    private void doneWindowOrCommit() {
-      windowOrCommitInFlight = false;
+    private void doneCommit() {
+      commitInFlight = false;
+    }
+
+    private void doneWindow() {
+      windowInFlight = false;
     }
 
     private void doneProcess() {

http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 90c1904..e49d74b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -40,6 +40,7 @@ object TaskConfig {
   val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
   val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
   val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms"  // timeout period for triggering a callback
+  val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask
 
   /**
    * Samza's container polls for more messages under two conditions. The first
@@ -128,4 +129,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(ms) => Some(ms.toLong)
     case _ => None
   }
+
+  def getAsyncCommit: Option[Boolean] = getOption(TaskConfig.ASYNC_COMMIT) match {
+    case Some(asyncCommit) => Some(asyncCommit.toBoolean)
+    case _ => None
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 798977b..e67c3e2 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeastOnce;
 
 public class TestAsyncRunLoop {
   Map<TaskName, TaskInstance> tasks;
@@ -100,7 +101,8 @@ public class TestAsyncRunLoop {
         callbackTimeoutMs,
         maxThrottlingDelayMs,
         containerMetrics,
-        () -> 0L);
+        () -> 0L,
+        false);
   }
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
@@ -120,8 +122,8 @@ public class TestAsyncRunLoop {
     callbackExecutor.submit(new Runnable() {
       @Override
       public void run() {
-        if (task.code != null) {
-          task.code.run(callback);
+        if (task.callbackHandler != null) {
+          task.callbackHandler.run(callback);
         }
 
         task.completed.incrementAndGet();
@@ -144,10 +146,12 @@ public class TestAsyncRunLoop {
     boolean commit = false;
     boolean success;
     int processed = 0;
+    int committed = 0;
     volatile int windowCount = 0;
 
     AtomicInteger completed = new AtomicInteger(0);
-    TestCode code = null;
+    TestCode callbackHandler = null;
+    TestCode commitHandler = null;
 
     TestTask(boolean success, boolean commit, boolean shutdown) {
       this.success = success;
@@ -166,7 +170,11 @@ public class TestAsyncRunLoop {
       processed++;
 
       if (commit) {
+        if (commitHandler != null) {
+          callbackExecutor.submit(() -> commitHandler.run(callback));
+        }
         coordinator.commit(commitRequest);
+        committed++;
       }
 
       if (shutdown) {
@@ -198,7 +206,7 @@ public class TestAsyncRunLoop {
     commitMs = -1;
     maxMessagesInFlight = 1;
     containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap());
-    callbackExecutor = Executors.newFixedThreadPool(2);
+    callbackExecutor = Executors.newFixedThreadPool(4);
     offsetManager = mock(OffsetManager.class);
     shutdownRequest = TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER;
 
@@ -271,7 +279,7 @@ public class TestAsyncRunLoop {
   public void testProcessOutOfOrder() throws Exception {
     maxMessagesInFlight = 2;
 
-    task0.code = buildOutofOrderCallback();
+    task0.callbackHandler = buildOutofOrderCallback();
 
     AsyncRunLoop runLoop = createRunLoop();
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
@@ -396,7 +404,7 @@ public class TestAsyncRunLoop {
     tasks.put(taskName1, t1);
 
     final CountDownLatch latch = new CountDownLatch(1);
-    task0.code = buildOutofOrderCallback();
+    task0.callbackHandler = buildOutofOrderCallback();
     AsyncRunLoop runLoop = createRunLoop();
     when(consumerMultiplexer.choose(false))
         .thenReturn(envelope0)
@@ -504,9 +512,122 @@ public class TestAsyncRunLoop {
         callbackTimeoutMs,
         maxThrottlingDelayMs,
         containerMetrics,
-        () -> 0L);
+        () -> 0L,
+        false);
 
     runLoop.run();
     callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
   }
+
+  @Test
+  public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
+    commitRequest = TaskCoordinator.RequestScope.CURRENT_TASK;
+    maxMessagesInFlight = 2;
+    TestTask task0 = new TestTask(true, true, false);
+    TestTask task1 = new TestTask(true, false, false);
+
+    IncomingMessageEnvelope firstMsg = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
+    IncomingMessageEnvelope secondMsg = new IncomingMessageEnvelope(ssp0, "1", "key1", "value1");
+    IncomingMessageEnvelope thirdMsg = new IncomingMessageEnvelope(ssp0, "2", "key0", "value0");
+
+    final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1);
+    final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1);
+    task0.callbackHandler = callback -> {
+      IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+      try {
+        if (envelope.equals(firstMsg)) {
+          firstMsgCompletionLatch.await();
+        } else if (envelope.equals(secondMsg)) {
+          firstMsgCompletionLatch.countDown();
+          secondMsgCompletionLatch.await();
+        } else if (envelope.equals(thirdMsg)) {
+          secondMsgCompletionLatch.countDown();
+          // OffsetManager.update with firstMsg offset, task.commit has happened when second message callback has not completed.
+          verify(offsetManager).update(taskName0, firstMsg.getSystemStreamPartition(), firstMsg.getOffset());
+          verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    };
+
+    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
+    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp0));
+    when(consumerMultiplexer.choose(false)).thenReturn(firstMsg)
+                                           .thenReturn(secondMsg)
+                                           .thenReturn(thirdMsg)
+                                           .thenReturn(ssp0EndOfStream);
+
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+    // Shutdown runLoop when all tasks are finished.
+    callbackExecutor.execute(() -> {
+        try {
+          firstMsgCompletionLatch.await();
+          secondMsgCompletionLatch.await();
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        runLoop.shutdown();
+      });
+
+    runLoop.run();
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+
+    verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
+    assertEquals(3, task0.processed);
+    assertEquals(3, task0.committed);
+    assertEquals(3, task1.processed);
+    assertEquals(0, task1.committed);
+  }
+
+  @Test
+  public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
+    TestTask task0 = new TestTask(true, true, false);
+
+    CountDownLatch commitLatch = new CountDownLatch(1);
+    task0.commitHandler = callback -> {
+      TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
+      if (taskCallback.envelope.equals(envelope3)) {
+        try {
+          commitLatch.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    task0.callbackHandler = callback -> {
+      TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
+      if (taskCallback.envelope.equals(envelope0)) {
+        // Both the process call has gone through when the first commit is in progress.
+        assertEquals(2, containerMetrics.processes().getCount());
+        assertEquals(0, containerMetrics.commits().getCount());
+        commitLatch.countDown();
+      }
+    };
+    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope3)
+                                           .thenReturn(envelope0)
+                                           .thenReturn(ssp0EndOfStream);
+
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+                                            callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+
+    // Shutdown runLoop after the commit.
+    callbackExecutor.execute(() -> {
+        try {
+          commitLatch.await();
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        runLoop.shutdown();
+      });
+
+    runLoop.run();
+
+    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+  }
 }