You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/17 18:46:46 UTC
samza git commit: SAMZA-1140; Non blocking commit in Async Runloop
Repository: samza
Updated Branches:
refs/heads/master 779c3d80e -> a080ca21b
SAMZA-1140; Non blocking commit in Async Runloop
Adds non blocking commit into AsyncRunLoop. Clients can enable it by setting an opt-in config task.async.commit (which is disabled by default). Contains the following list of changes for AsyncStreamTask type of tasks.
a) Enable commit for a task, when there're uncompleted callbacks from a task. Progressive commit of finished callbacks, when there are messages in flight for a task.
b) Enable processing of messages for a task, when there're commits in progress from a task.
Documentation for this change will be done in a separate PR.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>
Closes #85 from shanthoosh/asyncCommitSupport
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a080ca21
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a080ca21
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a080ca21
Branch: refs/heads/master
Commit: a080ca21ba24c476fe2136134db93eadc5748a90
Parents: 779c3d8
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Mar 17 11:46:34 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri Mar 17 11:46:34 2017 -0700
----------------------------------------------------------------------
.../apache/samza/container/RunLoopFactory.java | 5 +-
.../org/apache/samza/task/AsyncRunLoop.java | 53 +++++--
.../org/apache/samza/config/TaskConfig.scala | 6 +
.../org/apache/samza/task/TestAsyncRunLoop.java | 137 +++++++++++++++++--
4 files changed, 178 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..05e9979 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -87,6 +87,8 @@ public class RunLoopFactory {
} else {
Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
+ boolean isAsyncCommitEnabled = config.getAsyncCommit().getOrElse(defaultValue(false));
+
log.info("Got max messages in flight: " + taskMaxConcurrency);
Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
@@ -105,7 +107,8 @@ public class RunLoopFactory {
callbackTimeout,
maxThrottlingDelayMs,
containerMetrics,
- clock);
+ clock,
+ isAsyncCommitEnabled);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 1561bcf..8e375f1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -74,6 +74,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private volatile boolean shutdownNow = false;
private volatile Throwable throwable = null;
private final HighResolutionClock clock;
+ private final boolean isAsyncCommitEnabled;
public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances,
ExecutorService threadPool,
@@ -84,7 +85,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
long callbackTimeoutMs,
long maxThrottlingDelayMs,
SamzaContainerMetrics containerMetrics,
- HighResolutionClock clock) {
+ HighResolutionClock clock,
+ boolean isAsyncCommitEnabled) {
this.threadPool = threadPool;
this.consumerMultiplexer = consumerMultiplexer;
@@ -106,6 +108,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
// Partions and tasks assigned to the container will not change during the run loop life time
this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(taskInstances, workers));
this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values()));
+ this.isAsyncCommitEnabled = isAsyncCommitEnabled;
}
/**
@@ -442,7 +445,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
containerMetrics.windowNs().update(clock.nanoTime() - startTime);
coordinatorRequests.update(coordinator);
- state.doneWindowOrCommit();
+ state.doneWindow();
} catch (Throwable t) {
log.error("Task {} window failed", task.taskName(), t);
abort(t);
@@ -477,7 +480,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
task.commit();
containerMetrics.commitNs().update(clock.nanoTime() - startTime);
- state.doneWindowOrCommit();
+ state.doneCommit();
} catch (Throwable t) {
log.error("Task {} commit failed", task.taskName(), t);
abort(t);
@@ -569,7 +572,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private volatile boolean needCommit = false;
private volatile boolean complete = false;
private volatile boolean endOfStream = false;
- private volatile boolean windowOrCommitInFlight = false;
+ private volatile boolean windowInFlight = false;
+ private volatile boolean commitInFlight = false;
private final AtomicInteger messagesInFlight = new AtomicInteger(0);
private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
@@ -601,6 +605,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
/**
* Returns whether the task is ready to do process/window/commit.
+ *
*/
private boolean isReady() {
if (checkEndOfStream()) {
@@ -609,14 +614,30 @@ public class AsyncRunLoop implements Runnable, Throttleable {
if (coordinatorRequests.commitRequests().remove(taskName)) {
needCommit = true;
}
- if (needWindow || needCommit || endOfStream) {
- // ready for window or commit only when no messages are in progress and
- // no window/commit in flight
+
+ boolean windowOrCommitInFlight = windowInFlight || commitInFlight;
+ /*
+ * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
+ * and either of the following conditions are true.
+ * a) When process, window, commit are not in progress.
+ * b) When task.async.commit is true and window, commit are not in progress.
+ */
+ if (needCommit) {
+ return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !windowOrCommitInFlight;
+ } else if (needWindow || endOfStream) {
+ /*
+ * A task is ready for window operation, when task.window(needWindow) is requested by either user or window thread
+ * and window, commit are not in progress.
+ */
return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
} else {
- // ready for process only when the inflight message count does not exceed threshold
- // and no window/commit in flight
- return messagesInFlight.get() < maxConcurrency && !windowOrCommitInFlight;
+ /*
+ * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
+ * and either of the following conditions are true.
+ * a) When window, commit are not in progress.
+ * b) When task.async.commit is true and window is not in progress.
+ */
+ return messagesInFlight.get() < maxConcurrency && !windowInFlight && (isAsyncCommitEnabled || !commitInFlight);
}
}
@@ -646,12 +667,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
private void startWindow() {
needWindow = false;
- windowOrCommitInFlight = true;
+ windowInFlight = true;
}
private void startCommit() {
needCommit = false;
- windowOrCommitInFlight = true;
+ commitInFlight = true;
}
private void startProcess() {
@@ -659,8 +680,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
taskMetrics.messagesInFlight().set(count);
}
- private void doneWindowOrCommit() {
- windowOrCommitInFlight = false;
+ private void doneCommit() {
+ commitInFlight = false;
+ }
+
+ private void doneWindow() {
+ windowInFlight = false;
}
private void doneProcess() {
http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 90c1904..e49d74b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -40,6 +40,7 @@ object TaskConfig {
val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback
+ val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask
/**
* Samza's container polls for more messages under two conditions. The first
@@ -128,4 +129,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(ms) => Some(ms.toLong)
case _ => None
}
+
+ def getAsyncCommit: Option[Boolean] = getOption(TaskConfig.ASYNC_COMMIT) match {
+ case Some(asyncCommit) => Some(asyncCommit.toBoolean)
+ case _ => None
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a080ca21/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 798977b..e67c3e2 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeastOnce;
public class TestAsyncRunLoop {
Map<TaskName, TaskInstance> tasks;
@@ -100,7 +101,8 @@ public class TestAsyncRunLoop {
callbackTimeoutMs,
maxThrottlingDelayMs,
containerMetrics,
- () -> 0L);
+ () -> 0L,
+ false);
}
TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
@@ -120,8 +122,8 @@ public class TestAsyncRunLoop {
callbackExecutor.submit(new Runnable() {
@Override
public void run() {
- if (task.code != null) {
- task.code.run(callback);
+ if (task.callbackHandler != null) {
+ task.callbackHandler.run(callback);
}
task.completed.incrementAndGet();
@@ -144,10 +146,12 @@ public class TestAsyncRunLoop {
boolean commit = false;
boolean success;
int processed = 0;
+ int committed = 0;
volatile int windowCount = 0;
AtomicInteger completed = new AtomicInteger(0);
- TestCode code = null;
+ TestCode callbackHandler = null;
+ TestCode commitHandler = null;
TestTask(boolean success, boolean commit, boolean shutdown) {
this.success = success;
@@ -166,7 +170,11 @@ public class TestAsyncRunLoop {
processed++;
if (commit) {
+ if (commitHandler != null) {
+ callbackExecutor.submit(() -> commitHandler.run(callback));
+ }
coordinator.commit(commitRequest);
+ committed++;
}
if (shutdown) {
@@ -198,7 +206,7 @@ public class TestAsyncRunLoop {
commitMs = -1;
maxMessagesInFlight = 1;
containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap());
- callbackExecutor = Executors.newFixedThreadPool(2);
+ callbackExecutor = Executors.newFixedThreadPool(4);
offsetManager = mock(OffsetManager.class);
shutdownRequest = TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER;
@@ -271,7 +279,7 @@ public class TestAsyncRunLoop {
public void testProcessOutOfOrder() throws Exception {
maxMessagesInFlight = 2;
- task0.code = buildOutofOrderCallback();
+ task0.callbackHandler = buildOutofOrderCallback();
AsyncRunLoop runLoop = createRunLoop();
when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
@@ -396,7 +404,7 @@ public class TestAsyncRunLoop {
tasks.put(taskName1, t1);
final CountDownLatch latch = new CountDownLatch(1);
- task0.code = buildOutofOrderCallback();
+ task0.callbackHandler = buildOutofOrderCallback();
AsyncRunLoop runLoop = createRunLoop();
when(consumerMultiplexer.choose(false))
.thenReturn(envelope0)
@@ -504,9 +512,122 @@ public class TestAsyncRunLoop {
callbackTimeoutMs,
maxThrottlingDelayMs,
containerMetrics,
- () -> 0L);
+ () -> 0L,
+ false);
runLoop.run();
callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
+ commitRequest = TaskCoordinator.RequestScope.CURRENT_TASK;
+ maxMessagesInFlight = 2;
+ TestTask task0 = new TestTask(true, true, false);
+ TestTask task1 = new TestTask(true, false, false);
+
+ IncomingMessageEnvelope firstMsg = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
+ IncomingMessageEnvelope secondMsg = new IncomingMessageEnvelope(ssp0, "1", "key1", "value1");
+ IncomingMessageEnvelope thirdMsg = new IncomingMessageEnvelope(ssp0, "2", "key0", "value0");
+
+ final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1);
+ final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1);
+ task0.callbackHandler = callback -> {
+ IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+ try {
+ if (envelope.equals(firstMsg)) {
+ firstMsgCompletionLatch.await();
+ } else if (envelope.equals(secondMsg)) {
+ firstMsgCompletionLatch.countDown();
+ secondMsgCompletionLatch.await();
+ } else if (envelope.equals(thirdMsg)) {
+ secondMsgCompletionLatch.countDown();
+ // OffsetManager.update with firstMsg offset, task.commit has happened when second message callback has not completed.
+ verify(offsetManager).update(taskName0, firstMsg.getSystemStreamPartition(), firstMsg.getOffset());
+ verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ };
+
+ tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
+ tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp0));
+ when(consumerMultiplexer.choose(false)).thenReturn(firstMsg)
+ .thenReturn(secondMsg)
+ .thenReturn(thirdMsg)
+ .thenReturn(ssp0EndOfStream);
+
+ AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+ // Shutdown runLoop when all tasks are finished.
+ callbackExecutor.execute(() -> {
+ try {
+ firstMsgCompletionLatch.await();
+ secondMsgCompletionLatch.await();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ runLoop.shutdown();
+ });
+
+ runLoop.run();
+ callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+
+ verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
+ assertEquals(3, task0.processed);
+ assertEquals(3, task0.committed);
+ assertEquals(3, task1.processed);
+ assertEquals(0, task1.committed);
+ }
+
+ @Test
+ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
+ TestTask task0 = new TestTask(true, true, false);
+
+ CountDownLatch commitLatch = new CountDownLatch(1);
+ task0.commitHandler = callback -> {
+ TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
+ if (taskCallback.envelope.equals(envelope3)) {
+ try {
+ commitLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ task0.callbackHandler = callback -> {
+ TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
+ if (taskCallback.envelope.equals(envelope0)) {
+ // Both the process call has gone through when the first commit is in progress.
+ assertEquals(2, containerMetrics.processes().getCount());
+ assertEquals(0, containerMetrics.commits().getCount());
+ commitLatch.countDown();
+ }
+ };
+ tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
+ when(consumerMultiplexer.choose(false)).thenReturn(envelope3)
+ .thenReturn(envelope0)
+ .thenReturn(ssp0EndOfStream);
+
+ AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true);
+
+ // Shutdown runLoop after the commit.
+ callbackExecutor.execute(() -> {
+ try {
+ commitLatch.await();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ runLoop.shutdown();
+ });
+
+ runLoop.run();
+
+ callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+ }
}