You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/22 17:54:25 UTC

aurora git commit: Shutting down scheduler on unhandled BatchWorker error.

Repository: aurora
Updated Branches:
  refs/heads/master d3c5ca7cc -> 02ba97fbb


Shutting down scheduler on unhandled BatchWorker error.

Bugs closed: AURORA-1779

Reviewed at https://reviews.apache.org/r/52141/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/02ba97fb
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/02ba97fb
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/02ba97fb

Branch: refs/heads/master
Commit: 02ba97fbb2ead51c9f788ca58ac878b3fd2cfd8e
Parents: d3c5ca7
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Sep 22 10:54:14 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Sep 22 10:54:14 2016 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/BatchWorker.java    | 51 +++++++++++---------
 .../aurora/scheduler/BatchWorkerTest.java       | 24 +++++++--
 2 files changed, 47 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
index e05d4b4..c15a04c 100644
--- a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
+++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
@@ -128,7 +128,11 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
   }
 
   @Inject
-  protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) {
+  protected BatchWorker(
+      Storage storage,
+      StatsProvider statsProvider,
+      int maxBatchSize) {
+
     this.storage = requireNonNull(storage);
     this.maxBatchSize = maxBatchSize;
 
@@ -185,9 +189,15 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
   protected void run() throws Exception {
     while (isRunning()) {
       List<WorkItem<T>> batch = new LinkedList<>();
-      batch.add(workQueue.take());
-      workQueue.drainTo(batch, maxBatchSize - batch.size());
-      processBatch(batch);
+
+      // Make the loop responsive to shutdown under light load by using
+      // a short non-configurable timeout in poll().
+      Optional<WorkItem<T>> head = Optional.ofNullable(workQueue.poll(3, TimeUnit.SECONDS));
+      if (head.isPresent()) {
+        workQueue.add(head.get());
+        workQueue.drainTo(batch, maxBatchSize - batch.size());
+        processBatch(batch);
+      }
     }
   }
 
@@ -197,25 +207,20 @@ public class BatchWorker<T> extends AbstractExecutionThreadService {
       storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
         long lockedStart = System.nanoTime();
         for (WorkItem<T> item : batch) {
-          try {
-            Result<T> itemResult = item.work.apply(storeProvider);
-            if (itemResult.isCompleted) {
-              item.result.complete(itemResult.value);
-            } else {
-              // Work not finished yet - re-queue for a followup later.
-              long backoffMsec = backoffFor(item);
-              scheduledExecutor.schedule(
-                  () -> workQueue.add(new WorkItem<>(
-                      item.work,
-                      item.result,
-                      item.backoffStrategy,
-                      Optional.of(backoffMsec))),
-                  backoffMsec,
-                  TimeUnit.MILLISECONDS);
-            }
-          } catch (RuntimeException e) {
-            LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e);
-            item.result.completeExceptionally(e);
+          Result<T> itemResult = item.work.apply(storeProvider);
+          if (itemResult.isCompleted) {
+            item.result.complete(itemResult.value);
+          } else {
+            // Work not finished yet - re-queue for a followup later.
+            long backoffMsec = backoffFor(item);
+            scheduledExecutor.schedule(
+                () -> workQueue.add(new WorkItem<>(
+                    item.work,
+                    item.result,
+                    item.backoffStrategy,
+                    Optional.of(backoffMsec))),
+                backoffMsec,
+                TimeUnit.MILLISECONDS);
           }
         }
         batchLocked.accumulate(System.nanoTime() - lockedStart);

http://git-wip-us.apache.org/repos/asf/aurora/blob/02ba97fb/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
index a86dc82..67b6642 100644
--- a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
@@ -15,9 +15,11 @@ package org.apache.aurora.scheduler;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.BackoffStrategy;
 import org.apache.aurora.scheduler.BatchWorker.Result;
@@ -63,15 +65,27 @@ public class BatchWorkerTest extends EasyMockTest {
     assertTrue(result3.get());
   }
 
-  @Test(expected = ExecutionException.class)
+  @Test
   public void testExecuteThrows() throws Exception {
     control.replay();
 
-    CompletableFuture<Boolean> result =
-        batchWorker.execute(store -> { throw new IllegalArgumentException(); });
+    // Make sure BatchWorker service fails on unhandled error during batch processing.
+    CountDownLatch shutdownLatch = new CountDownLatch(1);
+    batchWorker.addListener(
+        new Service.Listener() {
+          @Override
+          public void failed(Service.State from, Throwable failure) {
+            shutdownLatch.countDown();
+          }
+        },
+        MoreExecutors.newDirectExecutorService());
+
     batchWorker.startAsync().awaitRunning();
+    batchWorker.execute(store -> {
+      throw new IllegalArgumentException();
+    });
 
-    result.get();
+    assertTrue(shutdownLatch.await(10L, TimeUnit.SECONDS));
   }
 
   @Test