You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/12/03 19:50:44 UTC

incubator-aurora git commit: Add a caching ClusterState implementation.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 8959b4995 -> 4405031aa


Add a caching ClusterState implementation.

Bugs closed: AURORA-121

Reviewed at https://reviews.apache.org/r/28607/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4405031a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4405031a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4405031a

Branch: refs/heads/master
Commit: 4405031aaccfafe4c526e55fee57c2ba2341339f
Parents: 8959b49
Author: Bill Farner <wf...@apache.org>
Authored: Wed Dec 3 10:25:16 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Dec 3 10:25:16 2014 -0800

----------------------------------------------------------------------
 .../async/preemptor/CachedClusterState.java     |  50 +++++++
 .../scheduler/async/preemptor/ClusterState.java |   2 +-
 .../async/preemptor/PreemptorModule.java        |  53 ++++++--
 .../async/preemptor/CachedClusterStateTest.java | 134 +++++++++++++++++++
 .../async/preemptor/PreemptionVictimTest.java   |  50 +++++++
 5 files changed, 274 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
new file mode 100644
index 0000000..03c2a8f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterState.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * A cached view of cluster state, kept up to date by pubsub notifications.
+ */
+class CachedClusterState implements ClusterState, PubsubEvent.EventSubscriber {
+
+  private final Multimap<String, PreemptionVictim> victims =
+      Multimaps.synchronizedMultimap(HashMultimap.<String, PreemptionVictim>create());
+
+  @Override
+  public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() {
+    return Multimaps.unmodifiableMultimap(victims);
+  }
+
+  @Subscribe
+  public void taskChangedState(TaskStateChange stateChange) {
+    synchronized (victims) {
+      String slaveId = stateChange.getTask().getAssignedTask().getSlaveId();
+      PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask());
+      if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) {
+        victims.put(slaveId, victim);
+      } else {
+        victims.remove(slaveId, victim);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
index 3524dc5..f7e157c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java
@@ -18,7 +18,7 @@ import com.google.common.collect.Multimap;
 /**
  * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster.
  */
-public interface ClusterState {
+interface ClusterState {
 
   /**
    * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 489c0bf..2030a94 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -19,6 +19,7 @@ import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.twitter.common.args.Arg;
@@ -27,9 +28,10 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.async.preemptor.PreemptorImpl.PreemptionDelay;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 
-public class PreemptorModule extends PrivateModule {
+public class PreemptorModule extends AbstractModule {
 
   private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName());
 
@@ -37,6 +39,11 @@ public class PreemptorModule extends PrivateModule {
       help = "Enable the preemptor and preemption")
   private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
 
+  @CmdLine(name = "enable_preemptor_caching",
+      help = "Cache the state consumed by the preemptor to improve scheduling throughput at the "
+          + "cost of higher memory consumption.")
+  private static final Arg<Boolean> ENABLE_PREEMPTOR_CACHING = Arg.create(true);
+
   @CmdLine(name = "preemption_delay",
       help = "Time interval after which a pending task becomes eligible to preempt other tasks")
   private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
@@ -55,20 +62,38 @@ public class PreemptorModule extends PrivateModule {
 
   @Override
   protected void configure() {
-    if (enablePreemptor) {
-      bind(Preemptor.class).to(PreemptorImpl.class);
-      bind(PreemptorImpl.class).in(Singleton.class);
-      bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
-          .toInstance(PREEMPTION_DELAY.get());
-      bind(ClusterState.class).to(LiveClusterState.class);
-      bind(LiveClusterState.class).in(Singleton.class);
-      LOG.info("Preemptor Enabled.");
-    } else {
-      bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
-      LOG.warning("Preemptor Disabled.");
-    }
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
+        if (enablePreemptor) {
+          LOG.info("Preemptor Enabled.");
+          bind(Preemptor.class).to(PreemptorImpl.class);
+          bind(PreemptorImpl.class).in(Singleton.class);
+          bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PreemptionDelay.class)
+              .toInstance(PREEMPTION_DELAY.get());
+          if (ENABLE_PREEMPTOR_CACHING.get()) {
+            bind(ClusterState.class).to(CachedClusterState.class);
+            bind(CachedClusterState.class).in(Singleton.class);
+            expose(CachedClusterState.class);
+          } else {
+            bind(ClusterState.class).to(LiveClusterState.class);
+            bind(LiveClusterState.class).in(Singleton.class);
+          }
+        } else {
+          bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
+          LOG.warning("Preemptor Disabled.");
+        }
 
-    expose(Preemptor.class);
+        expose(Preemptor.class);
+      }
+    });
+
+    // We can't do this in the private module due to the known conflict between multibindings
+    // and private modules due to multiple injectors.  We accept the added complexity here to keep
+    // the other bindings private.
+    if (enablePreemptor && ENABLE_PREEMPTOR_CACHING.get()) {
+      PubsubEventModule.bindSubscriber(binder(), CachedClusterState.class);
+    }
   }
 
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
new file mode 100644
index 0000000..7cc04dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/CachedClusterStateTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FAILED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.junit.Assert.assertEquals;
+
+public class CachedClusterStateTest {
+
+  private CachedClusterState state;
+
+  @Before
+  public void setUp() {
+    state = new CachedClusterState();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testImmutable() {
+    state.getSlavesToActiveTasks().clear();
+  }
+
+  @Test
+  public void testTaskLifecycle() {
+    IAssignedTask a = makeTask("a", "s1");
+
+    assertVictims();
+    changeState(a, THROTTLED);
+    assertVictims();
+    changeState(a, PENDING);
+    assertVictims();
+    changeState(a, ASSIGNED);
+    assertVictims(a);
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(a, KILLING);
+    assertVictims(a);
+    changeState(a, FINISHED);
+    assertVictims();
+  }
+
+  @Test
+  public void testTaskChangesSlaves() {
+    // We do not intend to handle the case of an external failure leading to the same task ID
+    // on a different slave.
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask a1 = makeTask("a", "s2");
+    changeState(a, RUNNING);
+    changeState(a1, RUNNING);
+    assertVictims(a, a1);
+  }
+
+  @Test
+  public void testMultipleTasks() {
+    IAssignedTask a = makeTask("a", "s1");
+    IAssignedTask b = makeTask("b", "s1");
+    IAssignedTask c = makeTask("c", "s2");
+    IAssignedTask d = makeTask("d", "s3");
+    IAssignedTask e = makeTask("e", "s3");
+    IAssignedTask f = makeTask("f", "s1");
+    changeState(a, RUNNING);
+    assertVictims(a);
+    changeState(b, RUNNING);
+    assertVictims(a, b);
+    changeState(c, RUNNING);
+    assertVictims(a, b, c);
+    changeState(d, RUNNING);
+    assertVictims(a, b, c, d);
+    changeState(e, RUNNING);
+    assertVictims(a, b, c, d, e);
+    changeState(c, FINISHED);
+    assertVictims(a, b, d, e);
+    changeState(a, FAILED);
+    changeState(e, KILLED);
+    assertVictims(b, d);
+    changeState(f, RUNNING);
+    assertVictims(b, d, f);
+  }
+
+  private void assertVictims(IAssignedTask... tasks) {
+    ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder();
+    for (IAssignedTask task : tasks) {
+      victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task));
+    }
+    assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks());
+  }
+
+  private IAssignedTask makeTask(String taskId, String slaveId) {
+    return IAssignedTask.build(new AssignedTask()
+        .setTaskId(taskId)
+        .setSlaveId(slaveId)
+        .setSlaveHost(slaveId + "host")
+        .setTask(new TaskConfig()
+            .setOwner(new Identity().setRole("role"))));
+  }
+
+  private void changeState(IAssignedTask assignedTask, ScheduleStatus status) {
+    IScheduledTask task = IScheduledTask.build(new ScheduledTask()
+        .setStatus(status)
+        .setAssignedTask(assignedTask.newBuilder()));
+    state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4405031a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
new file mode 100644
index 0000000..b0380b3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class PreemptionVictimTest {
+
+  @Test
+  public void testBeanMethods() {
+    PreemptionVictim a = makeVictim("a");
+    PreemptionVictim a1 = makeVictim("a");
+    PreemptionVictim b = makeVictim("b");
+    assertEquals(a, a1);
+    assertEquals(a.hashCode(), a1.hashCode());
+    assertEquals(a.toString(), a1.toString());
+    assertNotEquals(a, b);
+    assertNotEquals(a.toString(), b.toString());
+    assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b));
+  }
+
+  private PreemptionVictim makeVictim(String taskId) {
+    return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask()
+        .setTaskId(taskId)
+        .setSlaveId(taskId + "slave")
+        .setSlaveHost(taskId + "host")
+        .setTask(new TaskConfig()
+            .setOwner(new Identity().setRole("role")))));
+  }
+}