You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/22 17:54:25 UTC
aurora git commit: Shutting down scheduler on unhandled BatchWorker
error.
Repository: aurora
Updated Branches:
refs/heads/master d3c5ca7cc -> 02ba97fbb
Shutting down scheduler on unhandled BatchWorker error.
Bugs closed: AURORA-1779
Reviewed at https://reviews.apache.org/r/52141/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/02ba97fb
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/02ba97fb
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/02ba97fb
Branch: refs/heads/master
Commit: 02ba97fbb2ead51c9f788ca58ac878b3fd2cfd8e
Parents: d3c5ca7
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Sep 22 10:54:14 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Sep 22 10:54:14 2016 -0700
----------------------------------------------------------------------
.../apache/aurora/scheduler/BatchWorker.java | 51 +++++++++++---------
.../aurora/scheduler/BatchWorkerTest.java | 24 +++++++--
2 files changed, 47 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
index e05d4b4..c15a04c 100644
--- a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
+++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
@@ -128,7 +128,11 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
}
@Inject
- protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) {
+ protected BatchWorker(
+ Storage storage,
+ StatsProvider statsProvider,
+ int maxBatchSize) {
+
this.storage = requireNonNull(storage);
this.maxBatchSize = maxBatchSize;
@@ -185,9 +189,15 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
protected void run() throws Exception {
while (isRunning()) {
List<WorkItem<T>> batch = new LinkedList<>();
- batch.add(workQueue.take());
- workQueue.drainTo(batch, maxBatchSize - batch.size());
- processBatch(batch);
+
+ // Make the loop responsive to shutdown under light load by using
+ // a short non-configurable timeout in poll().
+ Optional<WorkItem<T>> head = Optional.ofNullable(workQueue.poll(3, TimeUnit.SECONDS));
+ if (head.isPresent()) {
+ workQueue.add(head.get());
+ workQueue.drainTo(batch, maxBatchSize - batch.size());
+ processBatch(batch);
+ }
}
}
@@ -197,25 +207,20 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
long lockedStart = System.nanoTime();
for (WorkItem<T> item : batch) {
- try {
- Result<T> itemResult = item.work.apply(storeProvider);
- if (itemResult.isCompleted) {
- item.result.complete(itemResult.value);
- } else {
- // Work not finished yet - re-queue for a followup later.
- long backoffMsec = backoffFor(item);
- scheduledExecutor.schedule(
- () -> workQueue.add(new WorkItem<>(
- item.work,
- item.result,
- item.backoffStrategy,
- Optional.of(backoffMsec))),
- backoffMsec,
- TimeUnit.MILLISECONDS);
- }
- } catch (RuntimeException e) {
- LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e);
- item.result.completeExceptionally(e);
+ Result<T> itemResult = item.work.apply(storeProvider);
+ if (itemResult.isCompleted) {
+ item.result.complete(itemResult.value);
+ } else {
+ // Work not finished yet - re-queue for a followup later.
+ long backoffMsec = backoffFor(item);
+ scheduledExecutor.schedule(
+ () -> workQueue.add(new WorkItem<>(
+ item.work,
+ item.result,
+ item.backoffStrategy,
+ Optional.of(backoffMsec))),
+ backoffMsec,
+ TimeUnit.MILLISECONDS);
}
}
batchLocked.accumulate(System.nanoTime() - lockedStart);
http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
index a86dc82..67b6642 100644
--- a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
@@ -15,9 +15,11 @@ package org.apache.aurora.scheduler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.BackoffStrategy;
import org.apache.aurora.scheduler.BatchWorker.Result;
@@ -63,15 +65,27 @@ public class BatchWorkerTest extends EasyMockTest {
assertTrue(result3.get());
}
- @Test(expected = ExecutionException.class)
+ @Test
public void testExecuteThrows() throws Exception {
control.replay();
- CompletableFuture<Boolean> result =
- batchWorker.execute(store -> { throw new IllegalArgumentException(); });
+ // Make sure BatchWorker service fails on unhandled error during batch processing.
+ CountDownLatch shutdownLatch = new CountDownLatch(1);
+ batchWorker.addListener(
+ new Service.Listener() {
+ @Override
+ public void failed(Service.State from, Throwable failure) {
+ shutdownLatch.countDown();
+ }
+ },
+ MoreExecutors.newDirectExecutorService());
+
batchWorker.startAsync().awaitRunning();
+ batchWorker.execute(store -> {
+ throw new IllegalArgumentException();
+ });
- result.get();
+ assertTrue(shutdownLatch.await(10L, TimeUnit.SECONDS));
}
@Test