You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/05/16 19:23:11 UTC

aurora git commit: Batching explicit task reconciliation calls

Repository: aurora
Updated Branches:
  refs/heads/master 8c900e585 -> 3cbff4117


Batching explicit task reconciliation calls

Reviewed at https://reviews.apache.org/r/47373/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3cbff411
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3cbff411
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3cbff411

Branch: refs/heads/master
Commit: 3cbff4117ff7b95e6110a38881aada7708137573
Parents: 8c900e5
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon May 16 12:22:59 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon May 16 12:22:59 2016 -0700

----------------------------------------------------------------------
 .../reconciliation/ReconciliationModule.java    | 15 +++++-
 .../reconciliation/TaskReconciler.java          | 53 +++++++++++++-------
 .../reconciliation/TaskReconcilerTest.java      | 40 +++++++++++----
 .../testing/FakeScheduledExecutor.java          |  5 ++
 4 files changed, 86 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
index cccee08..e076e80 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
@@ -88,6 +88,17 @@ public class ReconciliationModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
       Arg.create(Amount.of(30L, Time.MINUTES));
 
+  @Positive
+  @CmdLine(name = "reconciliation_explicit_batch_size",
+      help = "Number of tasks in a single batch request sent to Mesos for explicit reconciliation.")
+  private static final Arg<Integer> RECONCILIATION_BATCH_SIZE = Arg.create(1000);
+
+  @Positive
+  @CmdLine(name = "reconciliation_explicit_batch_interval",
+      help = "Interval between explicit batch reconciliation requests.")
+  private static final Arg<Amount<Long, Time>> RECONCILIATION_BATCH_INTERVAL =
+      Arg.create(Amount.of(5L, Time.SECONDS));
+
   @Qualifier
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
   @interface BackgroundWorker { }
@@ -127,7 +138,9 @@ public class ReconciliationModule extends AbstractModule {
             RECONCILIATION_INITIAL_DELAY.get(),
             RECONCILIATION_EXPLICIT_INTERVAL.get(),
             RECONCILIATION_IMPLICIT_INTERVAL.get(),
-            RECONCILIATION_SCHEDULE_SPREAD.get()));
+            RECONCILIATION_SCHEDULE_SPREAD.get(),
+            RECONCILIATION_BATCH_INTERVAL.get(),
+            RECONCILIATION_BATCH_SIZE.get()));
         bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class)
             .toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG));
         bind(TaskReconciler.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index 57d2061..3275d72 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.reconciliation;
 
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -21,7 +22,9 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractIdleService;
 
 import org.apache.aurora.common.quantity.Amount;
@@ -34,12 +37,14 @@ import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.Backgroun
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskStatus;
 
 import static java.util.Objects.requireNonNull;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 import static org.apache.aurora.common.quantity.Time.MINUTES;
+import static org.apache.aurora.common.quantity.Time.SECONDS;
 
 /**
  * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task
@@ -66,24 +71,35 @@ public class TaskReconciler extends AbstractIdleService {
     private final Amount<Long, Time> implicitInterval;
     private final long explicitDelayMinutes;
     private final long implicitDelayMinutes;
+    private final long explicitBatchDelaySeconds;
+    private final int explicitBatchSize;
 
     @VisibleForTesting
     TaskReconcilerSettings(
         Amount<Long, Time> initialDelay,
         Amount<Long, Time> explicitInterval,
         Amount<Long, Time> implicitInterval,
-        Amount<Long, Time> scheduleSpread) {
+        Amount<Long, Time> scheduleSpread,
+        Amount<Long, Time> explicitBatchInterval,
+        int explicitBatchSize) {
 
       this.explicitInterval = requireNonNull(explicitInterval);
       this.implicitInterval = requireNonNull(implicitInterval);
       explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES);
       implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES);
+      explicitBatchDelaySeconds = explicitBatchInterval.as(SECONDS);
+      this.explicitBatchSize = explicitBatchSize;
+
       checkArgument(
           explicitDelayMinutes >= 0,
           "Invalid explicit reconciliation delay: " + explicitDelayMinutes);
       checkArgument(
           implicitDelayMinutes >= 0L,
           "Invalid implicit reconciliation delay: " + implicitDelayMinutes);
+      checkArgument(
+          explicitBatchDelaySeconds >= 0L,
+          "Invalid explicit batch reconciliation delay: " + explicitBatchDelaySeconds
+      );
     }
   }
 
@@ -108,20 +124,24 @@ public class TaskReconciler extends AbstractIdleService {
     // Schedule explicit reconciliation.
     executor.scheduleAtFixedRate(
         () -> {
-          ImmutableSet<Protos.TaskStatus> active = FluentIterable
+          ImmutableList<TaskStatus> active = FluentIterable
               .from(Storage.Util.fetchTasks(
                   storage,
                   Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
               .transform(TASK_TO_PROTO)
-              .toSet();
-
-          driver.reconcileTasks(active);
+              .toList();
+
+          List<List<TaskStatus>> batches = Lists.partition(active, settings.explicitBatchSize);
+          long delay = 0;
+          for (List<TaskStatus> batch : batches) {
+            executor.schedule(() -> driver.reconcileTasks(batch), delay, SECONDS.getTimeUnit());
+            delay += settings.explicitBatchDelaySeconds;
+          }
           explicitRuns.incrementAndGet();
         },
         settings.explicitDelayMinutes,
         settings.explicitInterval.as(MINUTES),
         MINUTES.getTimeUnit());
-
     // Schedule implicit reconciliation.
     executor.scheduleAtFixedRate(
         () -> {
@@ -139,15 +159,14 @@ public class TaskReconciler extends AbstractIdleService {
   }
 
   @VisibleForTesting
-  static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO =
-      t -> Protos.TaskStatus.newBuilder()
-          // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
-          // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
-          // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
-          // Setting TASK_RUNNING as a safe dummy value here.
-          .setState(Protos.TaskState.TASK_RUNNING)
-          .setSlaveId(
-              Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build())
-          .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
-          .build();
+  static final Function<IScheduledTask, TaskStatus> TASK_TO_PROTO = t -> TaskStatus.newBuilder()
+      // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
+      // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
+      // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
+      // Setting TASK_RUNNING as a safe dummy value here.
+      .setState(Protos.TaskState.TASK_RUNNING)
+      .setSlaveId(
+          Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build())
+      .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
+      .build();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
index 5b4b3ac..b9317dc 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
@@ -13,12 +13,13 @@
  */
 package org.apache.aurora.scheduler.reconciliation;
 
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -36,10 +37,13 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.common.quantity.Time.MINUTES;
+import static org.apache.aurora.common.quantity.Time.SECONDS;
 import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.EXPLICIT_STAT_NAME;
 import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.IMPLICIT_STAT_NAME;
 import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TASK_TO_PROTO;
@@ -53,11 +57,15 @@ public class TaskReconcilerTest extends EasyMockTest {
   private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES);
   private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES);
   private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES);
+  private static final Amount<Long, Time> BATCH_DELAY = Amount.of(3L, SECONDS);
+  private static final int BATCH_SIZE = 1;
   private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings(
       INITIAL_DELAY,
       EXPLICIT_SCHEDULE,
       IMPLICT_SCHEDULE,
-      SPREAD);
+      SPREAD,
+      BATCH_DELAY,
+      BATCH_SIZE);
 
   private StorageTestUtil storageUtil;
   private StatsProvider statsProvider;
@@ -83,15 +91,25 @@ public class TaskReconcilerTest extends EasyMockTest {
     FakeScheduledExecutor clock =
         FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5);
 
-    IScheduledTask task = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
+    IScheduledTask task1 = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
+    IScheduledTask task2 = makeTask("id2", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
     storageUtil.expectOperations();
-    storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task)
-        .times(5);
+    storageUtil.expectTaskFetch(
+        Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES),
+        task1,
+        task2).times(5);
 
-    driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task)));
+    List<List<Protos.TaskStatus>> batches = Lists.partition(ImmutableList.of(
+        TASK_TO_PROTO.apply(task1),
+        TASK_TO_PROTO.apply(task2)), BATCH_SIZE);
+
+    driver.reconcileTasks(batches.get(0));
+    expectLastCall().times(5);
+
+    driver.reconcileTasks(batches.get(1));
     expectLastCall().times(5);
 
-    driver.reconcileTasks(ImmutableSet.of());
+    driver.reconcileTasks(EasyMock.anyObject());
     expectLastCall().times(2);
 
     control.replay();
@@ -130,7 +148,9 @@ public class TaskReconcilerTest extends EasyMockTest {
         INITIAL_DELAY,
         EXPLICIT_SCHEDULE,
         IMPLICT_SCHEDULE,
-        Amount.of(Long.MAX_VALUE, MINUTES));
+        Amount.of(Long.MAX_VALUE, MINUTES),
+        BATCH_DELAY,
+        BATCH_SIZE);
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -141,7 +161,9 @@ public class TaskReconcilerTest extends EasyMockTest {
         Amount.of(Long.MAX_VALUE, MINUTES),
         EXPLICIT_SCHEDULE,
         IMPLICT_SCHEDULE,
-        SPREAD);
+        SPREAD,
+        BATCH_DELAY,
+        BATCH_SIZE);
   }
 
   private static IScheduledTask makeTask(String id, ITaskConfig config) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index 9082a31..ce6e5a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -120,6 +120,11 @@ public final class FakeScheduledExecutor extends FakeClock {
       int maxInvocations) {
 
     FakeScheduledExecutor executor = new FakeScheduledExecutor();
+    mock.schedule(EasyMock.<Runnable>anyObject(), EasyMock.anyLong(), EasyMock.anyObject());
+    expectLastCall().andAnswer(() -> {
+      ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+      return null;
+    }).anyTimes();
     mock.scheduleAtFixedRate(
         EasyMock.anyObject(),
         EasyMock.anyLong(),