You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/12/03 19:50:44 UTC
incubator-aurora git commit: Add a caching ClusterState
implementation.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 8959b4995 -> 4405031aa
Add a caching ClusterState implementation.
Bugs closed: AURORA-121
Reviewed at https://reviews.apache.org/r/28607/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4405031a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4405031a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4405031a
Branch: refs/heads/master
Commit: 4405031aaccfafe4c526e55fee57c2ba2341339f
Parents: 8959b49
Author: Bill Farner <wf...@apache.org>
Authored: Wed Dec 3 10:25:16 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Dec 3 10:25:16 2014 -0800
----------------------------------------------------------------------
.../async/preemptor/CachedClusterState.java | 50 +++++++
.../scheduler/async/preemptor/ClusterState.java | 2 +-
.../async/preemptor/PreemptorModule.java | 53 ++++++--
.../async/preemptor/CachedClusterStateTest.java | 134 +++++++++++++++++++
.../async/preemptor/PreemptionVictimTest.java | 50 +++++++
5 files changed, 274 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
new file mode 100644
index 0000000..03c2a8f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * A cached view of cluster state, kept up to date by pubsub notifications.
+ */
+class CachedClusterState implements ClusterState, PubsubEvent.EventSubscriber {
+
+ private final Multimap<String, PreemptionVictim> victims =
+ Multimaps.synchronizedMultimap(HashMultimap.<String, PreemptionVictim>create());
+
+ @Override
+ public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
+ return Multimaps.unmodifiableMultimap(victims);
+ }
+
+ @Subscribe
+ public void taskChangedState(TaskStateChange stateChange) {
+ synchronized (victims) {
+ String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
+ PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
+ if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
+ victims.put(slaveId, victim);
+ } else {
+ victims.remove(slaveId, victim);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
index 3524dc5..f7e157c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -18,7 +18,7 @@ import com.google.common.collect.Multimap;
/**
* A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
*/
-public interface ClusterState {
+interface ClusterState {
/**
* Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 489c0bf..2030a94 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -19,6 +19,7 @@ import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
import com.twitter.common.args.Arg;
@@ -27,9 +28,10 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.PreemptionDelay;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
-public class PreemptorModule extends PrivateModule {
+public class PreemptorModule extends AbstractModule {
private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
@@ -37,6 +39,11 @@ public class PreemptorModule extends PrivateModule {
help = "Enable the preemptor and preemption")
private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+ @CmdLine(name = "enable_preemptor_caching",
+ help = "Cache the state consumed by the preemptor to improve scheduling throughput at the "
+ + "cost of higher memory consumption.")
+ private static final Arg<Boolean> ENABLE_PREEMPTOR_CACHING = Arg.create(true);
+
@CmdLine(name = "preemption_delay",
help = "Time interval after which a pending task becomes eligible to preempt other tasks")
private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
@@ -55,20 +62,38 @@ public class PreemptorModule extends PrivateModule {
@Override
protected void configure() {
- if (enablePreemptor) {
- bind(Preemptor.class).to(PreemptorImpl.class);
- bind(PreemptorImpl.class).in(Singleton.class);
- bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
- .toInstance(PREEMPTION_DELAY.get());
- bind(ClusterState.class).to(LiveClusterState.class);
- bind(LiveClusterState.class).in(Singleton.class);
- LOG.info("Preemptor Enabled.");
- } else {
- bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
- LOG.warning("Preemptor Disabled.");
- }
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ if (enablePreemptor) {
+ LOG.info("Preemptor Enabled.");
+ bind(Preemptor.class).to(PreemptorImpl.class);
+ bind(PreemptorImpl.class).in(Singleton.class);
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
+ .toInstance(PREEMPTION_DELAY.get());
+ if (ENABLE_PREEMPTOR_CACHING.get()) {
+ bind(ClusterState.class).to(CachedClusterState.class);
+ bind(CachedClusterState.class).in(Singleton.class);
+ expose(CachedClusterState.class);
+ } else {
+ bind(ClusterState.class).to(LiveClusterState.class);
+ bind(LiveClusterState.class).in(Singleton.class);
+ }
+ } else {
+ bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+ LOG.warning("Preemptor Disabled.");
+ }
- expose(Preemptor.class);
+ expose(Preemptor.class);
+ }
+ });
+
+ // We can't do this in the private module due to the known conflict between multibindings
+ // and private modules due to multiple injectors. We accept the added complexity here to keep
+ // the other bindings private.
+ if (enablePreemptor && ENABLE_PREEMPTOR_CACHING.get()) {
+ PubsubEventModule.bindSubscriber(binder(), CachedClusterState.class);
+ }
}
private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
new file mode 100644
index 0000000..7cc04dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.junit.Assert.assertEquals;
+
+public class CachedClusterStateTest {
+
+ private CachedClusterState state;
+
+ @Before
+ public void setUp() {
+ state = new CachedClusterState();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testImmutable() {
+ state.getSlavesToActiveTasks().clear();
+ }
+
+ @Test
+ public void testTaskLifecycle() {
+ IAssignedTask a = makeTask("a", "s1");
+
+ assertVictims();
+ changeState(a, THROTTLED);
+ assertVictims();
+ changeState(a, PENDING);
+ assertVictims();
+ changeState(a, ASSIGNED);
+ assertVictims(a);
+ changeState(a, RUNNING);
+ assertVictims(a);
+ changeState(a, KILLING);
+ assertVictims(a);
+ changeState(a, FINISHED);
+ assertVictims();
+ }
+
+ @Test
+ public void testTaskChangesSlaves() {
+ // We do not intend to handle the case of an external failure leading to the same task ID
+ // on a different slave.
+ IAssignedTask a = makeTask("a", "s1");
+ IAssignedTask a1 = makeTask("a", "s2");
+ changeState(a, RUNNING);
+ changeState(a1, RUNNING);
+ assertVictims(a, a1);
+ }
+
+ @Test
+ public void testMultipleTasks() {
+ IAssignedTask a = makeTask("a", "s1");
+ IAssignedTask b = makeTask("b", "s1");
+ IAssignedTask c = makeTask("c", "s2");
+ IAssignedTask d = makeTask("d", "s3");
+ IAssignedTask e = makeTask("e", "s3");
+ IAssignedTask f = makeTask("f", "s1");
+ changeState(a, RUNNING);
+ assertVictims(a);
+ changeState(b, RUNNING);
+ assertVictims(a, b);
+ changeState(c, RUNNING);
+ assertVictims(a, b, c);
+ changeState(d, RUNNING);
+ assertVictims(a, b, c, d);
+ changeState(e, RUNNING);
+ assertVictims(a, b, c, d, e);
+ changeState(c, FINISHED);
+ assertVictims(a, b, d, e);
+ changeState(a, FAILED);
+ changeState(e, KILLED);
+ assertVictims(b, d);
+ changeState(f, RUNNING);
+ assertVictims(b, d, f);
+ }
+
+ private void assertVictims(IAssignedTask... tasks) {
+ ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
+ for (IAssignedTask task : tasks) {
+ victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+ }
+ assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
+ }
+
+ private IAssignedTask makeTask(String taskId, String slaveId) {
+ return IAssignedTask.build(new AssignedTask()
+ .setTaskId(taskId)
+ .setSlaveId(slaveId)
+ .setSlaveHost(slaveId + "host")
+ .setTask(new TaskConfig()
+ .setOwner(new Identity().setRole("role"))));
+ }
+
+ private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
+ IScheduledTask task = IScheduledTask.build(new ScheduledTask()
+ .setStatus(status)
+ .setAssignedTask(assignedTask.newBuilder()));
+ state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
new file mode 100644
index 0000000..b0380b3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class PreemptionVictimTest {
+
+ @Test
+ public void testBeanMethods() {
+ PreemptionVictim a = makeVictim("a");
+ PreemptionVictim a1 = makeVictim("a");
+ PreemptionVictim b = makeVictim("b");
+ assertEquals(a, a1);
+ assertEquals(a.hashCode(), a1.hashCode());
+ assertEquals(a.toString(), a1.toString());
+ assertNotEquals(a, b);
+ assertNotEquals(a.toString(), b.toString());
+ assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b));
+ }
+
+ private PreemptionVictim makeVictim(String taskId) {
+ return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask()
+ .setTaskId(taskId)
+ .setSlaveId(taskId + "slave")
+ .setSlaveHost(taskId + "host")
+ .setTask(new TaskConfig()
+ .setOwner(new Identity().setRole("role")))));
+ }
+}