You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/05/16 19:23:11 UTC
aurora git commit: Batching explicit task reconciliation calls
Repository: aurora
Updated Branches:
refs/heads/master 8c900e585 -> 3cbff4117
Batching explicit task reconciliation calls
Reviewed at https://reviews.apache.org/r/47373/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3cbff411
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3cbff411
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3cbff411
Branch: refs/heads/master
Commit: 3cbff4117ff7b95e6110a38881aada7708137573
Parents: 8c900e5
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon May 16 12:22:59 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Mon May 16 12:22:59 2016 -0700
----------------------------------------------------------------------
.../reconciliation/ReconciliationModule.java | 15 +++++-
.../reconciliation/TaskReconciler.java | 53 +++++++++++++-------
.../reconciliation/TaskReconcilerTest.java | 40 +++++++++++----
.../testing/FakeScheduledExecutor.java | 5 ++
4 files changed, 86 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
index cccee08..e076e80 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
@@ -88,6 +88,17 @@ public class ReconciliationModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
Arg.create(Amount.of(30L, Time.MINUTES));
+ @Positive
+ @CmdLine(name = "reconciliation_explicit_batch_size",
+ help = "Number of tasks in a single batch request sent to Mesos for explicit reconciliation.")
+ private static final Arg<Integer> RECONCILIATION_BATCH_SIZE = Arg.create(1000);
+
+ @Positive
+ @CmdLine(name = "reconciliation_explicit_batch_interval",
+ help = "Interval between explicit batch reconciliation requests.")
+ private static final Arg<Amount<Long, Time>> RECONCILIATION_BATCH_INTERVAL =
+ Arg.create(Amount.of(5L, Time.SECONDS));
+
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface BackgroundWorker { }
@@ -127,7 +138,9 @@ public class ReconciliationModule extends AbstractModule {
RECONCILIATION_INITIAL_DELAY.get(),
RECONCILIATION_EXPLICIT_INTERVAL.get(),
RECONCILIATION_IMPLICIT_INTERVAL.get(),
- RECONCILIATION_SCHEDULE_SPREAD.get()));
+ RECONCILIATION_SCHEDULE_SPREAD.get(),
+ RECONCILIATION_BATCH_INTERVAL.get(),
+ RECONCILIATION_BATCH_SIZE.get()));
bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class)
.toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG));
bind(TaskReconciler.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index 57d2061..3275d72 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -13,6 +13,7 @@
*/
package org.apache.aurora.scheduler.reconciliation;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
@@ -21,7 +22,9 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.aurora.common.quantity.Amount;
@@ -34,12 +37,14 @@ import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.Backgroun
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskStatus;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.aurora.common.quantity.Time.MINUTES;
+import static org.apache.aurora.common.quantity.Time.SECONDS;
/**
* A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task
@@ -66,24 +71,35 @@ public class TaskReconciler extends AbstractIdleService {
private final Amount<Long, Time> implicitInterval;
private final long explicitDelayMinutes;
private final long implicitDelayMinutes;
+ private final long explicitBatchDelaySeconds;
+ private final int explicitBatchSize;
@VisibleForTesting
TaskReconcilerSettings(
Amount<Long, Time> initialDelay,
Amount<Long, Time> explicitInterval,
Amount<Long, Time> implicitInterval,
- Amount<Long, Time> scheduleSpread) {
+ Amount<Long, Time> scheduleSpread,
+ Amount<Long, Time> explicitBatchInterval,
+ int explicitBatchSize) {
this.explicitInterval = requireNonNull(explicitInterval);
this.implicitInterval = requireNonNull(implicitInterval);
explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES);
implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES);
+ explicitBatchDelaySeconds = explicitBatchInterval.as(SECONDS);
+ this.explicitBatchSize = explicitBatchSize;
+
checkArgument(
explicitDelayMinutes >= 0,
"Invalid explicit reconciliation delay: " + explicitDelayMinutes);
checkArgument(
implicitDelayMinutes >= 0L,
"Invalid implicit reconciliation delay: " + implicitDelayMinutes);
+ checkArgument(
+ explicitBatchDelaySeconds >= 0L,
+ "Invalid explicit batch reconciliation delay: " + explicitBatchDelaySeconds
+ );
}
}
@@ -108,20 +124,24 @@ public class TaskReconciler extends AbstractIdleService {
// Schedule explicit reconciliation.
executor.scheduleAtFixedRate(
() -> {
- ImmutableSet<Protos.TaskStatus> active = FluentIterable
+ ImmutableList<TaskStatus> active = FluentIterable
.from(Storage.Util.fetchTasks(
storage,
Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
.transform(TASK_TO_PROTO)
- .toSet();
-
- driver.reconcileTasks(active);
+ .toList();
+
+ List<List<TaskStatus>> batches = Lists.partition(active, settings.explicitBatchSize);
+ long delay = 0;
+ for (List<TaskStatus> batch : batches) {
+ executor.schedule(() -> driver.reconcileTasks(batch), delay, SECONDS.getTimeUnit());
+ delay += settings.explicitBatchDelaySeconds;
+ }
explicitRuns.incrementAndGet();
},
settings.explicitDelayMinutes,
settings.explicitInterval.as(MINUTES),
MINUTES.getTimeUnit());
-
// Schedule implicit reconciliation.
executor.scheduleAtFixedRate(
() -> {
@@ -139,15 +159,14 @@ public class TaskReconciler extends AbstractIdleService {
}
@VisibleForTesting
- static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO =
- t -> Protos.TaskStatus.newBuilder()
- // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
- // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
- // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
- // Setting TASK_RUNNING as a safe dummy value here.
- .setState(Protos.TaskState.TASK_RUNNING)
- .setSlaveId(
- Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build())
- .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
- .build();
+ static final Function<IScheduledTask, TaskStatus> TASK_TO_PROTO = t -> TaskStatus.newBuilder()
+ // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation
+ // purposes. This is the artifact of the native API. The new HTTP Mesos API will be
+ // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side.
+ // Setting TASK_RUNNING as a safe dummy value here.
+ .setState(Protos.TaskState.TASK_RUNNING)
+ .setSlaveId(
+ Protos.SlaveID.newBuilder().setValue(t.getAssignedTask().getSlaveId()).build())
+ .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build())
+ .build();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
index 5b4b3ac..b9317dc 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
@@ -13,12 +13,13 @@
*/
package org.apache.aurora.scheduler.reconciliation;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
@@ -36,10 +37,13 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.mesos.Protos;
+import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.common.quantity.Time.MINUTES;
+import static org.apache.aurora.common.quantity.Time.SECONDS;
import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.EXPLICIT_STAT_NAME;
import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.IMPLICIT_STAT_NAME;
import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TASK_TO_PROTO;
@@ -53,11 +57,15 @@ public class TaskReconcilerTest extends EasyMockTest {
private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES);
private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES);
private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES);
+ private static final Amount<Long, Time> BATCH_DELAY = Amount.of(3L, SECONDS);
+ private static final int BATCH_SIZE = 1;
private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings(
INITIAL_DELAY,
EXPLICIT_SCHEDULE,
IMPLICT_SCHEDULE,
- SPREAD);
+ SPREAD,
+ BATCH_DELAY,
+ BATCH_SIZE);
private StorageTestUtil storageUtil;
private StatsProvider statsProvider;
@@ -83,15 +91,25 @@ public class TaskReconcilerTest extends EasyMockTest {
FakeScheduledExecutor clock =
FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5);
- IScheduledTask task = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
+ IScheduledTask task1 = makeTask("id1", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
+ IScheduledTask task2 = makeTask("id2", TaskTestUtil.makeConfig(TaskTestUtil.JOB));
storageUtil.expectOperations();
- storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task)
- .times(5);
+ storageUtil.expectTaskFetch(
+ Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES),
+ task1,
+ task2).times(5);
- driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task)));
+ List<List<Protos.TaskStatus>> batches = Lists.partition(ImmutableList.of(
+ TASK_TO_PROTO.apply(task1),
+ TASK_TO_PROTO.apply(task2)), BATCH_SIZE);
+
+ driver.reconcileTasks(batches.get(0));
+ expectLastCall().times(5);
+
+ driver.reconcileTasks(batches.get(1));
expectLastCall().times(5);
- driver.reconcileTasks(ImmutableSet.of());
+ driver.reconcileTasks(EasyMock.anyObject());
expectLastCall().times(2);
control.replay();
@@ -130,7 +148,9 @@ public class TaskReconcilerTest extends EasyMockTest {
INITIAL_DELAY,
EXPLICIT_SCHEDULE,
IMPLICT_SCHEDULE,
- Amount.of(Long.MAX_VALUE, MINUTES));
+ Amount.of(Long.MAX_VALUE, MINUTES),
+ BATCH_DELAY,
+ BATCH_SIZE);
}
@Test(expected = IllegalArgumentException.class)
@@ -141,7 +161,9 @@ public class TaskReconcilerTest extends EasyMockTest {
Amount.of(Long.MAX_VALUE, MINUTES),
EXPLICIT_SCHEDULE,
IMPLICT_SCHEDULE,
- SPREAD);
+ SPREAD,
+ BATCH_DELAY,
+ BATCH_SIZE);
}
private static IScheduledTask makeTask(String id, ITaskConfig config) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/3cbff411/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index 9082a31..ce6e5a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -120,6 +120,11 @@ public final class FakeScheduledExecutor extends FakeClock {
int maxInvocations) {
FakeScheduledExecutor executor = new FakeScheduledExecutor();
+ mock.schedule(EasyMock.<Runnable>anyObject(), EasyMock.anyLong(), EasyMock.anyObject());
+ expectLastCall().andAnswer(() -> {
+ ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+ return null;
+ }).anyTimes();
mock.scheduleAtFixedRate(
EasyMock.anyObject(),
EasyMock.anyLong(),