You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:28 UTC
[35/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
new file mode 100644
index 0000000..400d8b7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -0,0 +1,319 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.events;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+import com.twitter.aurora.gen.HostStatus;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Event notifications related to tasks.
+ */
+public interface PubsubEvent {
+
+ /**
+ * Interface with no functionality, but identifies a class as supporting task pubsub events.
+ */
+ public interface EventSubscriber {
+ }
+
+ /**
+ * Event sent when tasks were deleted.
+ */
+ public static class TasksDeleted implements PubsubEvent {
+ private final Set<IScheduledTask> tasks;
+
+ public TasksDeleted(Set<IScheduledTask> tasks) {
+ this.tasks = checkNotNull(tasks);
+ }
+
+ public Set<IScheduledTask> getTasks() {
+ return tasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TasksDeleted)) {
+ return false;
+ }
+
+ TasksDeleted other = (TasksDeleted) o;
+ return Objects.equal(tasks, other.tasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tasks);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("tasks", Tasks.ids(tasks))
+ .toString();
+ }
+ }
+
+ /**
+ * Event sent when a task changed state.
+ */
+ public static class TaskStateChange implements PubsubEvent {
+ private final IScheduledTask task;
+ private final ScheduleStatus oldState;
+
+ public TaskStateChange(IScheduledTask task, ScheduleStatus oldState) {
+ this.task = checkNotNull(task);
+ this.oldState = checkNotNull(oldState);
+ }
+
+ public String getTaskId() {
+ return Tasks.id(task);
+ }
+
+ public ScheduleStatus getOldState() {
+ return oldState;
+ }
+
+ public IScheduledTask getTask() {
+ return task;
+ }
+
+ public ScheduleStatus getNewState() {
+ return task.getStatus();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TaskStateChange)) {
+ return false;
+ }
+
+ TaskStateChange other = (TaskStateChange) o;
+ return Objects.equal(task, other.task)
+ && Objects.equal(oldState, other.oldState);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(task, oldState);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("task", Tasks.id(task))
+ .add("oldState", getOldState())
+ .add("newState", getNewState())
+ .toString();
+ }
+ }
+
+ /**
+ * Event sent when a host changed maintenance state.
+ */
+ public static class HostMaintenanceStateChange implements PubsubEvent {
+ private final HostStatus status;
+
+ public HostMaintenanceStateChange(HostStatus status) {
+ this.status = checkNotNull(status);
+ }
+
+ public HostStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HostMaintenanceStateChange)) {
+ return false;
+ }
+
+ HostMaintenanceStateChange other = (HostMaintenanceStateChange) o;
+ return Objects.equal(status, other.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(status);
+ }
+ }
+
+ /**
+ * Event sent when a scheduling assignment was vetoed.
+ */
+ public static class Vetoed implements PubsubEvent {
+ private final String taskId;
+ private final Set<Veto> vetoes;
+
+ public Vetoed(String taskId, Set<Veto> vetoes) {
+ this.taskId = checkNotNull(taskId);
+ this.vetoes = checkNotNull(vetoes);
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public Set<Veto> getVetoes() {
+ return vetoes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Vetoed)) {
+ return false;
+ }
+
+ Vetoed other = (Vetoed) o;
+ return Objects.equal(taskId, other.taskId)
+ && Objects.equal(vetoes, other.vetoes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(taskId, vetoes);
+ }
+ }
+
+ public static class TaskRescheduled implements PubsubEvent {
+ private final String role;
+ private final String job;
+ private final int instance;
+
+ public TaskRescheduled(String role, String job, int instance) {
+ this.role = role;
+ this.job = job;
+ this.instance = instance;
+ }
+
+ public String getRole() {
+ return role;
+ }
+
+ public String getJob() {
+ return job;
+ }
+
+ public int getInstance() {
+ return instance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TaskRescheduled)) {
+ return false;
+ }
+
+ TaskRescheduled other = (TaskRescheduled) o;
+ return Objects.equal(role, other.role)
+ && Objects.equal(job, other.job)
+ && Objects.equal(instance, other.instance);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(role, job, instance);
+ }
+ }
+
+ public static class StorageStarted implements PubsubEvent {
+ @Override
+ public boolean equals(Object o) {
+ return (o != null) && getClass().equals(o.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ public static class DriverRegistered implements PubsubEvent {
+ @Override
+ public boolean equals(Object o) {
+ return (o != null) && getClass().equals(o.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ public static class DriverDisconnected implements PubsubEvent {
+ @Override
+ public boolean equals(Object o) {
+ return (o != null) && getClass().equals(o.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ public static final class Interceptors {
+ private Interceptors() {
+ // Utility class.
+ }
+
+ public enum Event {
+ None(null),
+ StorageStarted(new StorageStarted()),
+ DriverRegistered(new DriverRegistered()),
+ DriverDisconnected(new DriverDisconnected());
+
+ private final PubsubEvent event;
+ private Event(PubsubEvent event) {
+ this.event = event;
+ }
+
+ public PubsubEvent getEvent() {
+ return event;
+ }
+ }
+
+ /**
+ * An annotation to place on methods of injected classes that which to fire events before
+ * and/or after their invocation.
+ */
+ @Target(METHOD) @Retention(RUNTIME)
+ public @interface SendNotification {
+ /**
+ * Event to fire prior to invocation.
+ */
+ Event before() default Event.None;
+
+ /**
+ * Event to fire after invocation.
+ */
+ Event after() default Event.None;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
new file mode 100644
index 0000000..a8b5633
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.events;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.DeadEvent;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.TypeLiteral;
+import com.google.inject.matcher.Matchers;
+import com.google.inject.multibindings.Multibinder;
+
+import org.aopalliance.intercept.MethodInterceptor;
+
+import com.twitter.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Binding module for plumbing event notifications.
+ */
+public final class PubsubEventModule extends AbstractModule {
+
+ private static final Logger LOG = Logger.getLogger(PubsubEventModule.class.getName());
+
+ private PubsubEventModule() {
+ // Must be constructed through factory.
+ }
+
+ @VisibleForTesting
+ public static void installForTest(Binder binder) {
+ binder.install(new PubsubEventModule());
+ }
+
+ @Override
+ protected void configure() {
+ final EventBus eventBus = new EventBus("TaskEvents");
+ eventBus.register(new Object() {
+ @Subscribe public void logDeadEvent(DeadEvent event) {
+ LOG.warning("Captured dead event " + event.getEvent());
+ }
+ });
+
+ bind(EventBus.class).toInstance(eventBus);
+
+ Closure<PubsubEvent> eventPoster = new Closure<PubsubEvent>() {
+ @Override public void execute(PubsubEvent event) {
+ eventBus.post(event);
+ }
+ };
+ bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventPoster);
+
+ // Ensure at least an empty binding is present.
+ getSubscriberBinder(binder());
+ LifecycleModule.bindStartupAction(binder(), RegisterSubscribers.class);
+ bindNotifyingInterceptor(binder());
+ }
+
+ static class RegisterSubscribers implements Command {
+ private final EventBus eventBus;
+ private final Set<EventSubscriber> subscribers;
+
+ @Inject
+ RegisterSubscribers(EventBus eventBus, Set<EventSubscriber> subscribers) {
+ this.eventBus = checkNotNull(eventBus);
+ this.subscribers = checkNotNull(subscribers);
+ }
+
+ @Override
+ public void execute() {
+ for (EventSubscriber subscriber : subscribers) {
+ eventBus.register(subscriber);
+ }
+ }
+ }
+
+ /**
+ * Binds a task event module.
+ *
+ * @param binder Binder to bind against.
+ * @param filterClass Delegate scheduling filter implementation class.
+ */
+ public static void bind(Binder binder, final Class<? extends SchedulingFilter> filterClass) {
+ binder.bind(SchedulingFilter.class).annotatedWith(NotifyDelegate.class).to(filterClass);
+ binder.bind(SchedulingFilter.class).to(NotifyingSchedulingFilter.class);
+ binder.bind(NotifyingSchedulingFilter.class).in(Singleton.class);
+ binder.install(new PubsubEventModule());
+ }
+
+ private static Multibinder<EventSubscriber> getSubscriberBinder(Binder binder) {
+ return Multibinder.newSetBinder(binder, EventSubscriber.class);
+ }
+
+ /**
+ * Binds a subscriber to receive task events.
+ *
+ * @param binder Binder to bind the subscriber with.
+ * @param subscriber Subscriber implementation class to register for events.
+ */
+ public static void bindSubscriber(Binder binder, Class<? extends EventSubscriber> subscriber) {
+ getSubscriberBinder(binder).addBinding().to(subscriber);
+ }
+
+ /**
+ * Binds a method interceptor to all methods annotated with {@link SendNotification}.
+ * <p>
+ * The interceptor will send notifications before and/or after the wrapped method invocation.
+ *
+ * @param binder Guice binder.
+ */
+ @VisibleForTesting
+ public static void bindNotifyingInterceptor(Binder binder) {
+ MethodInterceptor interceptor = new NotifyingMethodInterceptor();
+ binder.requestInjection(interceptor);
+ binder.bindInterceptor(
+ Matchers.any(),
+ Matchers.annotatedWith(SendNotification.class),
+ interceptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
new file mode 100644
index 0000000..66327fa
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.filter;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.IValueConstraint;
+
+/**
+ * Utility class that matches attributes to constraints.
+ */
+final class AttributeFilter {
+
+ private static final Function<Attribute, Set<String>> GET_VALUES =
+ new Function<Attribute, Set<String>>() {
+ @Override public Set<String> apply(Attribute attribute) {
+ return attribute.getValues();
+ }
+ };
+
+ private AttributeFilter() {
+ // Utility class.
+ }
+
+ /**
+ * Tests whether a constraint is satisfied by attributes.
+ *
+ * @param attributes Host attributes.
+ * @param constraint Constraint to match.
+ * @return {@code true} if the attribute satisfies the constraint, {@code false} otherwise.
+ */
+ static boolean matches(Set<Attribute> attributes, IValueConstraint constraint) {
+ Set<String> allAttributes =
+ ImmutableSet.copyOf(Iterables.concat(Iterables.transform(attributes, GET_VALUES)));
+ boolean match = Iterables.any(constraint.getValues(), Predicates.in(allAttributes));
+ return constraint.isNegated() ^ match;
+ }
+
+ /**
+ * Tests whether an attribute matches a limit constraint.
+ *
+ * @param attributes Attributes to match against.
+ * @param jobKey Key of the job with the limited constraint.
+ * @param limit Limit value.
+ * @param activeTasks All active tasks in the system.
+ * @param attributeFetcher Interface for fetching attributes for hosts in the system.
+ * @return {@code true} if the limit constraint is satisfied, {@code false} otherwise.
+ */
+ static boolean matches(final Set<Attribute> attributes,
+ final IJobKey jobKey,
+ int limit,
+ Iterable<IScheduledTask> activeTasks,
+ final AttributeLoader attributeFetcher) {
+
+ Predicate<IScheduledTask> sameJob =
+ Predicates.compose(Predicates.equalTo(jobKey), Tasks.SCHEDULED_TO_JOB_KEY);
+
+ Predicate<IScheduledTask> hasAttribute = new Predicate<IScheduledTask>() {
+ @Override public boolean apply(IScheduledTask task) {
+ Iterable<Attribute> hostAttributes =
+ attributeFetcher.apply(task.getAssignedTask().getSlaveHost());
+ return Iterables.any(hostAttributes, Predicates.in(attributes));
+ }
+ };
+
+ return limit > Iterables.size(
+ Iterables.filter(activeTasks, Predicates.and(sameJob, hasAttribute)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
new file mode 100644
index 0000000..2612cb8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.filter;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
+import com.twitter.aurora.scheduler.storage.entities.IConstraint;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConstraint;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Filter that determines whether a task's constraints are satisfied.
+ */
+class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
+
+ private static final Logger LOG = Logger.getLogger(ConstraintFilter.class.getName());
+
+ private final IJobKey jobKey;
+ private final Supplier<Collection<IScheduledTask>> activeTasksSupplier;
+ private final AttributeLoader attributeLoader;
+ private final Iterable<Attribute> hostAttributes;
+
+ /**
+ * Creates a new constraint filer for a given job.
+ *
+ * @param jobKey Key for the job.
+ * @param activeTasksSupplier Supplier to fetch active tasks (if necessary).
+ * @param attributeLoader Interface to fetch host attributes (if necessary).
+ * @param hostAttributes The attributes of the host to test against.
+ */
+ ConstraintFilter(
+ IJobKey jobKey,
+ Supplier<Collection<IScheduledTask>> activeTasksSupplier,
+ AttributeLoader attributeLoader,
+ Iterable<Attribute> hostAttributes) {
+
+ this.jobKey = checkNotNull(jobKey);
+ this.activeTasksSupplier = checkNotNull(activeTasksSupplier);
+ this.attributeLoader = checkNotNull(attributeLoader);
+ this.hostAttributes = checkNotNull(hostAttributes);
+ }
+
+ @VisibleForTesting
+ static Veto limitVeto(String limit) {
+ return new Veto("Limit not satisfied: " + limit, Veto.MAX_SCORE);
+ }
+
+ @VisibleForTesting
+ static Veto mismatchVeto(String constraint) {
+ return Veto.constraintMismatch("Constraint not satisfied: " + constraint);
+ }
+
+ @VisibleForTesting
+ static Veto maintenanceVeto(String reason) {
+ return new Veto("Host " + reason + " for maintenance", Veto.MAX_SCORE);
+ }
+
+ @Override
+ public Optional<Veto> apply(IConstraint constraint) {
+ Set<Attribute> attributes =
+ ImmutableSet.copyOf(Iterables.filter(hostAttributes, new NameFilter(constraint.getName())));
+
+ ITaskConstraint taskConstraint = constraint.getConstraint();
+ switch (taskConstraint.getSetField()) {
+ case VALUE:
+ boolean matches =
+ AttributeFilter.matches(attributes, taskConstraint.getValue());
+ return matches
+ ? Optional.<Veto>absent()
+ : Optional.of(mismatchVeto(constraint.getName()));
+
+ case LIMIT:
+ if (attributes.isEmpty()) {
+ return Optional.of(mismatchVeto(constraint.getName()));
+ }
+
+ boolean satisfied = AttributeFilter.matches(
+ attributes,
+ jobKey,
+ taskConstraint.getLimit().getLimit(),
+ activeTasksSupplier.get(),
+ attributeLoader);
+ return satisfied
+ ? Optional.<Veto>absent()
+ : Optional.of(limitVeto(constraint.getName()));
+
+ default:
+ LOG.warning("Unrecognized constraint type: " + taskConstraint.getSetField());
+ throw new SchedulerException("Failed to recognize the constraint type: "
+ + taskConstraint.getSetField());
+ }
+ }
+
+ /**
+ * A filter to find attributes matching a name.
+ */
+ static class NameFilter implements Predicate<Attribute> {
+ private final String attributeName;
+
+ NameFilter(String attributeName) {
+ this.attributeName = attributeName;
+ }
+
+ @Override public boolean apply(Attribute attribute) {
+ return attributeName.equals(attribute.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
new file mode 100644
index 0000000..7e8fbdb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.filter;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Determines whether a proposed scheduling assignment should be allowed.
+ */
+public interface SchedulingFilter {
+
+ /**
+ * Reason for a proposed scheduling assignment to be filtered out.
+ * A veto also contains a score, which is an opaque indicator as to how strong a veto is. This
+ * is only intended to be used for relative ranking of vetoes for determining which veto against
+ * a scheduling assignment is 'weakest'.
+ */
+ public static class Veto {
+ public static final int MAX_SCORE = 1000;
+
+ private final String reason;
+ private final int score;
+ private final boolean valueMismatch;
+
+ private Veto(String reason, int score, boolean valueMismatch) {
+ this.reason = reason;
+ this.score = Math.min(MAX_SCORE, score);
+ this.valueMismatch = valueMismatch;
+ }
+
+ @VisibleForTesting
+ public Veto(String reason, int score) {
+ this(reason, score, false);
+ }
+
+ /**
+ * Creates a special veto that represents a mismatch between the server and task's configuration
+ * for an attribute.
+ *
+ * @param reason Information about the value mismatch.
+ * @return A constraint mismatch veto.
+ */
+ public static Veto constraintMismatch(String reason) {
+ return new Veto(reason, MAX_SCORE, true);
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public int getScore() {
+ return score;
+ }
+
+ public boolean isConstraintMismatch() {
+ return valueMismatch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Veto)) {
+ return false;
+ }
+
+ Veto other = (Veto) o;
+ return Objects.equal(reason, other.reason)
+ && (score == other.score)
+ && (valueMismatch == other.valueMismatch);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(reason, score, valueMismatch);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("reason", reason)
+ .add("score", score)
+ .add("valueMismatch", valueMismatch)
+ .toString();
+ }
+ }
+
+ /**
+ * Applies a task against the filter with the given resources, and on the host.
+ *
+ * @param offer Resources offered.
+ * @param slaveHost Host that the resources are associated with.
+ * @param task Task.
+ * @param taskId Canonical ID of the task.
+ * @return A set of vetoes indicating reasons the task cannot be scheduled. If the task may be
+ * scheduled, the set will be empty.
+ */
+ Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
new file mode 100644
index 0000000..33272ce
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.filter;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.TaskConstraint;
+import com.twitter.aurora.scheduler.ResourceSlot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.state.MaintenanceController;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work.Quiet;
+import com.twitter.aurora.scheduler.storage.entities.IConstraint;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+import static com.twitter.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
+import static com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
+import static com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
+import static com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
+import static com.twitter.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
+
+/**
+ * Implementation of the scheduling filter that ensures resource requirements of tasks are
+ * fulfilled, and that tasks are allowed to run on the given machine.
+ *
+ */
+public class SchedulingFilterImpl implements SchedulingFilter {
+
+ @VisibleForTesting static final Veto DEDICATED_HOST_VETO =
+ Veto.constraintMismatch("Host is dedicated");
+
+ private static final Optional<Veto> NO_VETO = Optional.absent();
+
+ private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
+
+ private final Storage storage;
+ private final MaintenanceController maintenance;
+
+ /**
+ * Creates a new scheduling filter.
+ *
+ * @param storage Interface to accessing the task store.
+ * @param maintenance Interface to accessing the maintenance controller
+ */
+ @Inject
+ public SchedulingFilterImpl(Storage storage, MaintenanceController maintenance) {
+ this.storage = checkNotNull(storage);
+ this.maintenance = checkNotNull(maintenance);
+ }
+
+ /**
+ * A function that fetches attributes associated with a given host.
+ */
+ public interface AttributeLoader extends Function<String, Iterable<Attribute>> { }
+
+ /**
+ * A function that may veto a task.
+ */
+ private interface FilterRule extends Function<ITaskConfig, Iterable<Veto>> { }
+
+ /**
+ * Convenience class for a rule that will only ever have a single veto.
+ */
+ private abstract static class SingleVetoRule implements FilterRule {
+ @Override public final Iterable<Veto> apply(ITaskConfig task) {
+ return doApply(task).asSet();
+ }
+
+ abstract Optional<Veto> doApply(ITaskConfig task);
+ }
+
+ // Scaling ranges to use for comparison of vetos. This has no real bearing besides trying to
+ // determine if a veto along one resource vector is a 'stronger' veto than that of another vector.
+ // The values below represent the maximum resources on a typical slave machine.
+ @VisibleForTesting
+ enum ResourceVector {
+ CPU("CPU", 16),
+ RAM("RAM", Amount.of(24, Data.GB).as(Data.MB)),
+ DISK("disk", Amount.of(450, Data.GB).as(Data.MB)),
+ PORTS("ports", 1000);
+
+ private final String name;
+ private final int range;
+ @VisibleForTesting
+ int getRange() {
+ return range;
+ }
+
+ private ResourceVector(String name, int range) {
+ this.name = name;
+ this.range = range;
+ }
+
+ Optional<Veto> maybeVeto(double available, double requested) {
+ double tooLarge = requested - available;
+ if (tooLarge <= 0) {
+ return NO_VETO;
+ } else {
+ return Optional.of(veto(tooLarge));
+ }
+ }
+
+ private static int scale(double value, int range) {
+ return Math.min(Veto.MAX_SCORE, (int) ((Veto.MAX_SCORE * value)) / range);
+ }
+
+ @VisibleForTesting
+ Veto veto(double excess) {
+ return new Veto("Insufficient " + name, scale(excess, range));
+ }
+ }
+
+ private Iterable<FilterRule> rulesFromOffer(final ResourceSlot available) {
+ return ImmutableList.<FilterRule>of(
+ new SingleVetoRule() {
+ @Override public Optional<Veto> doApply(ITaskConfig task) {
+ return CPU.maybeVeto(
+ available.getNumCpus(),
+ ResourceSlot.from(task).getNumCpus());
+ }
+ },
+ new SingleVetoRule() {
+ @Override public Optional<Veto> doApply(ITaskConfig task) {
+ return RAM.maybeVeto(
+ available.getRam().as(Data.MB),
+ ResourceSlot.from(task).getRam().as(Data.MB));
+ }
+ },
+ new SingleVetoRule() {
+ @Override public Optional<Veto> doApply(ITaskConfig task) {
+ return DISK.maybeVeto(available.getDisk().as(Data.MB),
+ ResourceSlot.from(task).getDisk().as(Data.MB));
+ }
+ },
+ new SingleVetoRule() {
+ @Override public Optional<Veto> doApply(ITaskConfig task) {
+ return PORTS.maybeVeto(available.getNumPorts(),
+ ResourceSlot.from(task).getNumPorts());
+ }
+ }
+ );
+ }
+
+ private static boolean isValueConstraint(IConstraint constraint) {
+ return constraint.getConstraint().getSetField() == TaskConstraint._Fields.VALUE;
+ }
+
+ private static final Ordering<IConstraint> VALUES_FIRST = Ordering.from(
+ new Comparator<IConstraint>() {
+ @Override public int compare(IConstraint a, IConstraint b) {
+ if (a.getConstraint().getSetField() == b.getConstraint().getSetField()) {
+ return 0;
+ }
+ return isValueConstraint(a) ? -1 : 1;
+ }
+ });
+
+ private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+ EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
+ private FilterRule getConstraintFilter(final String slaveHost) {
+ return new FilterRule() {
+ @Override public Iterable<Veto> apply(final ITaskConfig task) {
+ if (!task.isSetConstraints()) {
+ return ImmutableList.of();
+ }
+
+ // In the interest of performance, we perform a weakly consistent read here. The biggest
+ // risk of this is that we might schedule against stale host attributes, or we might fail
+ // to correctly satisfy a diversity constraint. Given that the likelihood is relatively low
+ // for both of these, and the impact is also low, the weak consistency is acceptable.
+ return storage.weaklyConsistentRead(new Quiet<Iterable<Veto>>() {
+ @Override public Iterable<Veto> apply(final StoreProvider storeProvider) {
+ AttributeLoader attributeLoader = new AttributeLoader() {
+ @Override public Iterable<Attribute> apply(String host) {
+ return AttributeStore.Util.attributesOrNone(storeProvider, host);
+ }
+ };
+
+ Supplier<Collection<IScheduledTask>> activeTasksSupplier =
+ Suppliers.memoize(new Supplier<Collection<IScheduledTask>>() {
+ @Override public Collection<IScheduledTask> get() {
+ return storeProvider.getTaskStore().fetchTasks(
+ Query.jobScoped(Tasks.INFO_TO_JOB_KEY.apply(task))
+ .byStatus(ACTIVE_NOT_PENDING_STATES));
+ }
+ });
+
+ ConstraintFilter constraintFilter = new ConstraintFilter(
+ Tasks.INFO_TO_JOB_KEY.apply(task),
+ activeTasksSupplier,
+ attributeLoader,
+ attributeLoader.apply(slaveHost));
+ ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+ for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
+ Optional<Veto> veto = constraintFilter.apply(constraint);
+ if (veto.isPresent()) {
+ vetoes.add(veto.get());
+ if (isValueConstraint(constraint)) {
+ // Break when a value constraint mismatch is found to avoid other
+ // potentially-expensive operations to satisfy other constraints.
+ break;
+ }
+ }
+ }
+
+ return vetoes.build();
+ }
+ });
+ }
+ };
+ }
+
+ private Optional<Veto> getMaintenanceVeto(String slaveHost) {
+ MaintenanceMode mode = maintenance.getMode(slaveHost);
+ return VETO_MODES.contains(mode)
+ ? Optional.of(ConstraintFilter.maintenanceVeto(mode.toString().toLowerCase()))
+ : NO_VETO;
+ }
+
+ private Set<Veto> getResourceVetoes(ResourceSlot offer, ITaskConfig task) {
+ ImmutableSet.Builder<Veto> builder = ImmutableSet.builder();
+ for (FilterRule rule : rulesFromOffer(offer)) {
+ builder.addAll(rule.apply(task));
+ }
+ return builder.build();
+ }
+
+ private boolean isDedicated(final String slaveHost) {
+ Iterable<Attribute> slaveAttributes =
+ storage.weaklyConsistentRead(new Quiet<Iterable<Attribute>>() {
+ @Override public Iterable<Attribute> apply(final StoreProvider storeProvider) {
+ return AttributeStore.Util.attributesOrNone(storeProvider, slaveHost);
+ }
+ });
+
+ return Iterables.any(slaveAttributes, new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+ }
+
+ @Override
+ public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
+ if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
+ return ImmutableSet.of(DEDICATED_HOST_VETO);
+ }
+ return ImmutableSet.<Veto>builder()
+ .addAll(getConstraintFilter(slaveHost).apply(task))
+ .addAll(getResourceVetoes(offer, task))
+ .addAll(getMaintenanceVeto(slaveHost).asSet())
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/ClusterName.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ClusterName.java b/src/main/java/org/apache/aurora/scheduler/http/ClusterName.java
new file mode 100644
index 0000000..90c87a7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/ClusterName.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Binding annotation for the cluster name.
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ClusterName { }
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Cron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Cron.java b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
new file mode 100644
index 0000000..8c97b5e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Cron.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.aurora.scheduler.state.CronJobManager;
+
+/**
+ * HTTP interface to dump state of the internal cron scheduler.
+ */
+@Path("/cron")
+public class Cron {
+ private final CronJobManager cronManager;
+
+ @Inject
+ Cron(CronJobManager cronManager) {
+ this.cronManager = cronManager;
+ }
+
+ /**
+ * Dumps the state of the cron manager.
+ *
+ * @return An HTTP response containing the cron manager's state.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response dumpContents() {
+ Map<String, Object> response = ImmutableMap.<String, Object>builder()
+ .put("scheduled", cronManager.getScheduledJobs())
+ .put("pending", cronManager.getPendingRuns())
+ .build();
+
+ return Response.ok(response).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/DisplayUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/DisplayUtils.java b/src/main/java/org/apache/aurora/scheduler/http/DisplayUtils.java
new file mode 100644
index 0000000..b4a2762
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/DisplayUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.scheduler.MesosTaskFactory.MesosTaskFactoryImpl;
+import com.twitter.aurora.scheduler.http.SchedulerzHome.Role;
+import com.twitter.aurora.scheduler.http.SchedulerzRole.Job;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+
+/**
+ * Utility class to hold common display helper functions.
+ */
+public final class DisplayUtils {
+
+ @CmdLine(name = "viz_job_url_prefix", help = "URL prefix for job container stats.")
+ private static final Arg<String> VIZ_JOB_URL_PREFIX = Arg.create("");
+
+ private DisplayUtils() {
+ // Utility class.
+ }
+
+ static final Ordering<Role> ROLE_ORDERING = Ordering.natural().onResultOf(
+ new Function<Role, String>() {
+ @Override public String apply(Role role) {
+ return role.getRole();
+ }
+ });
+
+ static final Ordering<Job> JOB_ORDERING = Ordering.natural().onResultOf(
+ new Function<Job, String>() {
+ @Override public String apply(Job job) {
+ return job.getName();
+ }
+ });
+
+ static String getJobDashboardUrl(IJobKey jobKey) {
+ return VIZ_JOB_URL_PREFIX.get() + MesosTaskFactoryImpl.getJobSourceName(jobKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/HttpStatsFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/HttpStatsFilter.java b/src/main/java/org/apache/aurora/scheduler/http/HttpStatsFilter.java
new file mode 100644
index 0000000..a846383
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/HttpStatsFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpServletResponseWrapper;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import com.twitter.common.net.http.filters.AbstractHttpFilter;
+import com.twitter.common.stats.SlidingStats;
+
+/**
+ * An HTTP filter that exports counts and timing for requests based on response code.
+ */
+public class HttpStatsFilter extends AbstractHttpFilter {
+
+ private final LoadingCache<Integer, SlidingStats> counters = CacheBuilder.newBuilder()
+ .build(new CacheLoader<Integer, SlidingStats>() {
+ @Override public SlidingStats load(Integer status) {
+ return new SlidingStats("http_" + status + "_responses", "nanos");
+ }
+ });
+
+ private static class ResponseWithStatus extends HttpServletResponseWrapper {
+ // 200 response code is the default if none is explicitly set.
+ private int wrappedStatus = 200;
+
+ ResponseWithStatus(HttpServletResponse resp) {
+ super(resp);
+ }
+
+ @Override public void setStatus(int sc) {
+ super.setStatus(sc);
+ wrappedStatus = sc;
+ }
+
+ @Override public void setStatus(int sc, String sm) {
+ super.setStatus(sc, sm);
+ wrappedStatus = sc;
+ }
+ }
+
+ @Override
+ public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+
+ long start = System.nanoTime();
+ ResponseWithStatus wrapper = new ResponseWithStatus(response);
+ chain.doFilter(request, wrapper);
+ counters.getUnchecked(wrapper.wrappedStatus).accumulate(System.nanoTime() - start);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
new file mode 100644
index 0000000..5c13de5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.io.StringWriter;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.common.base.Closure;
+import com.twitter.common.util.templating.StringTemplateHelper;
+import com.twitter.common.util.templating.StringTemplateHelper.TemplateException;
+
+/**
+ * Base class for common functions needed in a jersey stringtemplate servlet.
+ */
+abstract class JerseyTemplateServlet {
+
+ private final StringTemplateHelper templateHelper;
+
+ JerseyTemplateServlet(String templatePath) {
+ templateHelper = new StringTemplateHelper(getClass(), templatePath, true);
+ }
+
+ protected final Response fillTemplate(Closure<StringTemplate> populator) {
+ StringWriter output = new StringWriter();
+ try {
+ templateHelper.writeTemplate(output, populator);
+ } catch (TemplateException e) {
+ throw new WebApplicationException(e);
+ }
+ return Response.ok(output.toString()).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
new file mode 100644
index 0000000..20e4446
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Atomics;
+
+import com.twitter.common.application.modules.LocalServiceRegistry;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+
+/**
+ * Redirect logic for finding the leading scheduler in the event that this process is not the
+ * leader.
+ */
+public class LeaderRedirect {
+
+ // TODO(wfarner): Should we tie this directly to the producer of the node (HttpModule? It seems
+ // like the right thing to do, but would introduce an otherwise unnecessary dependency.
+ @VisibleForTesting
+ static final String HTTP_PORT_NAME = "http";
+
+ private static final Logger LOG = Logger.getLogger(LeaderRedirect.class.getName());
+
+ private final LocalServiceRegistry serviceRegistry;
+ private final DynamicHostSet<ServiceInstance> schedulers;
+
+ private final AtomicReference<ServiceInstance> leader = Atomics.newReference();
+
+ @Inject
+ LeaderRedirect(LocalServiceRegistry serviceRegistry, DynamicHostSet<ServiceInstance> schedulers) {
+ this.serviceRegistry = Preconditions.checkNotNull(serviceRegistry);
+ this.schedulers = Preconditions.checkNotNull(schedulers);
+ }
+
+ /**
+ * Initiates the monitor that will watch the scheduler host set.
+ *
+ * @throws MonitorException If monitoring failed to initialize.
+ */
+ public void monitor() throws MonitorException {
+ schedulers.watch(new SchedulerMonitor());
+ }
+
+ private Optional<HostAndPort> getLeaderHttp() {
+ ServiceInstance leadingScheduler = leader.get();
+ if (leadingScheduler == null) {
+ return Optional.absent();
+ }
+
+ if (leadingScheduler.isSetAdditionalEndpoints()) {
+ Endpoint leaderHttp = leadingScheduler.getAdditionalEndpoints().get(HTTP_PORT_NAME);
+ if (leaderHttp != null && leaderHttp.isSetHost() && leaderHttp.isSetPort()) {
+ return Optional.of(HostAndPort.fromParts(leaderHttp.getHost(), leaderHttp.getPort()));
+ }
+ }
+
+ LOG.warning("Leader service instance seems to be incomplete: " + leadingScheduler);
+ return Optional.absent();
+ }
+
+ private Optional<HostAndPort> getLocalHttp() {
+ InetSocketAddress localHttp = serviceRegistry.getAuxiliarySockets().get(HTTP_PORT_NAME);
+ return (localHttp == null) ? Optional.<HostAndPort>absent()
+ : Optional.of(HostAndPort.fromParts(localHttp.getHostName(), localHttp.getPort()));
+ }
+
+ /**
+ * Gets the optional HTTP endpoint that should be redirected to in the event that this
+ * scheduler is not the leader.
+ *
+ * @return Optional redirect target.
+ */
+ @VisibleForTesting
+ Optional<HostAndPort> getRedirect() {
+ Optional<HostAndPort> leaderHttp = getLeaderHttp();
+ Optional<HostAndPort> localHttp = getLocalHttp();
+
+ if (leaderHttp.isPresent()) {
+ if (leaderHttp.equals(localHttp)) {
+ return Optional.absent();
+ } else {
+ return leaderHttp;
+ }
+ } else {
+ LOG.info("No leader found, not redirecting.");
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * Gets the optional redirect URI target in the event that this process is not the leading
+ * scheduler.
+ *
+ * @param req HTTP request.
+ * @return An optional redirect destination to route the request to the leading scheduler.
+ */
+ public Optional<String> getRedirectTarget(HttpServletRequest req) {
+ Optional<HostAndPort> redirectTarget = getRedirect();
+ if (redirectTarget.isPresent()) {
+ HostAndPort target = redirectTarget.get();
+ StringBuilder redirect = new StringBuilder()
+ .append(req.getScheme())
+ .append("://")
+ .append(target.getHostText())
+ .append(":")
+ .append(target.getPort())
+ .append(req.getRequestURI());
+
+ String queryString = req.getQueryString();
+ if (queryString != null) {
+ redirect.append("?").append(queryString);
+ }
+
+ return Optional.of(redirect.toString());
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * Monitor to track scheduler leader changes.
+ */
+ private class SchedulerMonitor implements HostChangeMonitor<ServiceInstance> {
+ @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+ switch (hostSet.size()) {
+ case 0:
+ LOG.warning("No schedulers in host set, will not redirect despite not being leader.");
+ leader.set(null);
+ break;
+
+ case 1:
+ LOG.info("Found leader scheduler at " + hostSet);
+ leader.set(Iterables.getOnlyElement(hostSet));
+ break;
+
+ default:
+ LOG.severe("Multiple schedulers detected, will not redirect: " + hostSet);
+ leader.set(null);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirectFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirectFilter.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirectFilter.java
new file mode 100644
index 0000000..86f0567
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirectFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.io.IOException;
+
+import javax.inject.Inject;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import com.twitter.common.net.http.filters.AbstractHttpFilter;
+
+/**
+ * An HTTP filter that will redirect the request to the leading scheduler.
+ */
+public class LeaderRedirectFilter extends AbstractHttpFilter {
+
+ private final LeaderRedirect redirector;
+
+ @Inject
+ LeaderRedirectFilter(LeaderRedirect redirector) {
+ this.redirector = Preconditions.checkNotNull(redirector);
+ }
+
+ @Override
+ public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+
+ Optional<String> leaderRedirect = redirector.getRedirectTarget(request);
+ if (leaderRedirect.isPresent()) {
+ response.sendRedirect(leaderRedirect.get());
+ } else {
+ chain.doFilter(request, response);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
new file mode 100644
index 0000000..fb71539
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
+import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
+import static com.twitter.aurora.gen.MaintenanceMode.SCHEDULED;
+
+/**
+ * Servlet that exposes the maintenance state of hosts.
+ */
+@Path("/maintenance")
+public class Maintenance {
+ private final Storage storage;
+
+ @Inject
+ Maintenance(Storage storage) {
+ this.storage = Preconditions.checkNotNull(storage);
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getHosts() {
+ return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+ @Override public Response apply(StoreProvider storeProvider) {
+ Multimap<MaintenanceMode, String> hostsByMode =
+ Multimaps.transformValues(
+ Multimaps.index(storeProvider.getAttributeStore().getHostAttributes(), GET_MODE),
+ HOST_NAME);
+
+ Map<MaintenanceMode, Object> hosts = Maps.newHashMap();
+ hosts.put(DRAINED, ImmutableSet.copyOf(hostsByMode.get(DRAINED)));
+ hosts.put(SCHEDULED, ImmutableSet.copyOf(hostsByMode.get(SCHEDULED)));
+ hosts.put(DRAINING, getTasksByHosts(storeProvider, hostsByMode.get(DRAINING)).asMap());
+ return Response.ok(hosts).build();
+ }
+ });
+ }
+
+ private Multimap<String, String> getTasksByHosts(StoreProvider provider, Iterable<String> hosts) {
+ ImmutableSet.Builder<IScheduledTask> drainingTasks = ImmutableSet.builder();
+ for (String host : hosts) {
+ drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(host).active()));
+ }
+ return Multimaps.transformValues(
+ Multimaps.index(drainingTasks.build(), TASK_TO_HOST),
+ Tasks.SCHEDULED_TO_ID);
+ }
+
+ private static final Function<IScheduledTask, String> TASK_TO_HOST =
+ new Function<IScheduledTask, String>() {
+ @Override public String apply(IScheduledTask task) {
+ return task.getAssignedTask().getSlaveHost();
+ }
+ };
+
+ private static final Function<HostAttributes, String> HOST_NAME =
+ new Function<HostAttributes, String>() {
+ @Override public String apply(HostAttributes attributes) {
+ return attributes.getHost();
+ }
+ };
+
+ private static final Function<HostAttributes, MaintenanceMode> GET_MODE =
+ new Function<HostAttributes, MaintenanceMode>() {
+ @Override public MaintenanceMode apply(HostAttributes attrs) {
+ return attrs.getMode();
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Mname.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Mname.java b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
new file mode 100644
index 0000000..c97560f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Mname.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.RUNNING;
+
+/**
+ * Simple redirector from the canonical name of a task to its configured HTTP port.
+ *
+ * <p>Forwards for GET, PUT, POST and DELETE requests using HTTP 307 allowing compliant clients to
+ * seamlessly perform re-directed mutations.
+ */
+@Path("/mname")
+public class Mname {
+
+ private static final Set<String> HTTP_PORT_NAMES = ImmutableSet.of(
+ "health", "http", "HTTP", "web");
+
+ private final Storage storage;
+
+ @Inject
+ public Mname(Storage storage) {
+ this.storage = checkNotNull(storage);
+ }
+
+ @GET
+ @Produces(MediaType.TEXT_HTML)
+ public Response getUsage() {
+ return Response
+ .status(Status.BAD_REQUEST)
+ .entity("<html>Usage: /mname/{role}/{env}/{job}/{instance}</html>")
+ .build();
+ }
+
+ @GET
+ @Path("/{role}/{env}/{job}/{instance}/{forward:.+}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response getWithForwardRequest(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @PathParam("forward") String forward,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.of(forward));
+ }
+
+ @PUT
+ @Path("/{role}/{env}/{job}/{instance}/{forward:.+}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response putWithForwardRequest(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @PathParam("forward") String forward,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.of(forward));
+ }
+
+ @POST
+ @Path("/{role}/{env}/{job}/{instance}/{forward:.+}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response postWithForwardRequest(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @PathParam("forward") String forward,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.of(forward));
+ }
+
+ @DELETE
+ @Path("/{role}/{env}/{job}/{instance}/{forward:.+}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response deleteWithForwardRequest(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @PathParam("forward") String forward,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.of(forward));
+ }
+
+ @GET
+ @Path("/{role}/{env}/{job}/{instance}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response get(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.<String>absent());
+ }
+
+ @PUT
+ @Path("/{role}/{env}/{job}/{instance}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response put(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.<String>absent());
+ }
+
+ @POST
+ @Path("/{role}/{env}/{job}/{instance}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response post(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.<String>absent());
+ }
+
+ @DELETE
+ @Path("/{role}/{env}/{job}/{instance}")
+ @Produces(MediaType.TEXT_HTML)
+ public Response delete(
+ @PathParam("role") String role,
+ @PathParam("env") String env,
+ @PathParam("job") String job,
+ @PathParam("instance") int instanceId,
+ @Context UriInfo uriInfo) {
+
+ return get(role, env, job, instanceId, uriInfo, Optional.<String>absent());
+ }
+
+ private Response get(
+ String role,
+ String env,
+ String job,
+ int instanceId,
+ UriInfo uriInfo,
+ Optional<String> forwardRequest) {
+
+ IScheduledTask task = Iterables.getOnlyElement(
+ Storage.Util.consistentFetchTasks(storage,
+ Query.instanceScoped(JobKeys.from(role, env, job), instanceId).active()),
+ null);
+ if (task == null) {
+ return respond(NOT_FOUND, "No such live instance found.");
+ }
+
+ if (task.getStatus() != RUNNING) {
+ return respond(NOT_FOUND, "The selected instance is currently in state " + task.getStatus());
+ }
+
+ IAssignedTask assignedTask = task.getAssignedTask();
+ Optional<Integer> port = getRedirectPort(assignedTask);
+ if (!port.isPresent()) {
+ return respond(NOT_FOUND, "The task does not have a registered http port.");
+ }
+
+ UriBuilder redirect = UriBuilder
+ .fromPath(forwardRequest.or("/"))
+ .scheme("http")
+ .host(assignedTask.getSlaveHost())
+ .port(port.get());
+ for (Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+ for (String value : entry.getValue()) {
+ redirect.queryParam(entry.getKey(), value);
+ }
+ }
+
+ return Response.temporaryRedirect(redirect.build()).build();
+ }
+
+ @VisibleForTesting
+ static Optional<Integer> getRedirectPort(IAssignedTask task) {
+ Map<String, Integer> ports = task.isSetAssignedPorts()
+ ? task.getAssignedPorts() : ImmutableMap.<String, Integer>of();
+ for (String httpPortName : HTTP_PORT_NAMES) {
+ Integer port = ports.get(httpPortName);
+ if (port != null) {
+ return Optional.of(port);
+ }
+ }
+ return Optional.absent();
+ }
+
+ private Response respond(Status status, String message) {
+ return Response.status(status).entity(message).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
new file mode 100644
index 0000000..e90f1cc
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.mesos.Protos.Attribute;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.Value.Range;
+
+import com.twitter.aurora.scheduler.async.OfferQueue;
+
+/**
+ * Servlet that exposes resource offers that the scheduler is currently retaining.
+ */
+@Path("/offers")
+public class Offers {
+
+ private final OfferQueue offerQueue;
+
+ @Inject
+ Offers(OfferQueue offerQueue) {
+ this.offerQueue = Preconditions.checkNotNull(offerQueue);
+ }
+
+ /**
+ * Dumps the offers queued in the scheduler.
+ *
+ * @return HTTP response.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getOffers() {
+ return Response.ok(
+ FluentIterable.from(offerQueue.getOffers()).transform(TO_BEAN).toList()).build();
+ }
+
+ private static final Function<ExecutorID, String> EXECUTOR_ID_TOSTRING =
+ new Function<ExecutorID, String>() {
+ @Override public String apply(ExecutorID id) {
+ return id.getValue();
+ }
+ };
+
+ private static final Function<Range, Object> RANGE_TO_BEAN = new Function<Range, Object>() {
+ @Override public Object apply(Range range) {
+ return range.getBegin() + "-" + range.getEnd();
+ }
+ };
+
+ private static final Function<Attribute, Object> ATTRIBUTE_TO_BEAN =
+ new Function<Attribute, Object>() {
+ @Override public Object apply(Attribute attr) {
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+ builder.put("name", attr.getName());
+ if (attr.hasScalar()) {
+ builder.put("scalar", attr.getScalar().getValue());
+ }
+ if (attr.hasRanges()) {
+ builder.put("ranges", immutable(attr.getRanges().getRangeList(), RANGE_TO_BEAN));
+ }
+ if (attr.hasSet()) {
+ builder.put("set", attr.getSet().getItemList());
+ }
+ if (attr.hasText()) {
+ builder.put("text", attr.getText().getValue());
+ }
+ return builder.build();
+ }
+ };
+
+ private static final Function<Resource, Object> RESOURCE_TO_BEAN =
+ new Function<Resource, Object>() {
+ @Override public Object apply(Resource resource) {
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+ builder.put("name", resource.getName());
+ if (resource.hasScalar()) {
+ builder.put("scalar", resource.getScalar().getValue());
+ }
+ if (resource.hasRanges()) {
+ builder.put("ranges", immutable(resource.getRanges().getRangeList(), RANGE_TO_BEAN));
+ }
+ if (resource.hasSet()) {
+ builder.put("set", resource.getSet().getItemList());
+ }
+ return builder.build();
+ }
+ };
+
+ private static <A, B> Iterable<B> immutable(Iterable<A> iterable, Function<A, B> transform) {
+ return FluentIterable.from(iterable).transform(transform).toList();
+ }
+
+ private static final Function<Offer, Map<String, ?>> TO_BEAN =
+ new Function<Offer, Map<String, ?>>() {
+ @Override public Map<String, ?> apply(Offer offer) {
+ return ImmutableMap.<String, Object>builder()
+ .put("id", offer.getId().getValue())
+ .put("framework_id", offer.getFrameworkId().getValue())
+ .put("slave_id", offer.getSlaveId().getValue())
+ .put("hostname", offer.getHostname())
+ .put("resources", immutable(offer.getResourcesList(), RESOURCE_TO_BEAN))
+ .put("attributes", immutable(offer.getAttributesList(), ATTRIBUTE_TO_BEAN))
+ .put("executor_ids", immutable(offer.getExecutorIdsList(), EXECUTOR_ID_TOSTRING))
+ .build();
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
new file mode 100644
index 0000000..359fb97
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Preconditions;
+
+import com.twitter.aurora.scheduler.async.TaskGroups;
+
+/**
+ * Servlet that exposes detailed information about tasks that are pending.
+ */
+@Path("/pendingtasks")
+public class PendingTasks {
+
+ private final TaskGroups taskGroups;
+
+ @Inject
+ PendingTasks(TaskGroups taskGroups) {
+ this.taskGroups = Preconditions.checkNotNull(taskGroups);
+ }
+
+ /**
+ * Returns information about pending tasks.
+ *
+ * @return HTTP response.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getOffers() {
+ return Response.ok(taskGroups.getGroups()).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
new file mode 100644
index 0000000..1183275
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+
+/**
+ * Servlet that exposes allocated resource quotas.
+ */
+@Path("/quotas")
+public class Quotas {
+
+ private final Storage storage;
+
+ @Inject
+ Quotas(Storage storage) {
+ this.storage = Preconditions.checkNotNull(storage);
+ }
+
+ /**
+ * Dumps allocated resource quotas.
+ *
+ * @return HTTP response.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getOffers(@QueryParam("role") final String role) {
+ return storage.weaklyConsistentRead(new Work.Quiet<Response>() {
+ @Override public Response apply(StoreProvider storeProvider) {
+ Map<String, IQuota> quotas;
+ if (role == null) {
+ quotas = storeProvider.getQuotaStore().fetchQuotas();
+ } else {
+ Optional<IQuota> quota = storeProvider.getQuotaStore().fetchQuota(role);
+ if (quota.isPresent()) {
+ quotas = ImmutableMap.of(role, quota.get());
+ } else {
+ quotas = ImmutableMap.of();
+ }
+ }
+
+ return Response.ok(Maps.transformValues(quotas, TO_BEAN)).build();
+ }
+ });
+ }
+
+ private static final Function<IQuota, QuotaBean> TO_BEAN = new Function<IQuota, QuotaBean>() {
+ @Override public QuotaBean apply(IQuota quota) {
+ return new QuotaBean(quota.getNumCpus(), quota.getRamMb(), quota.getDiskMb());
+ }
+ };
+
+ private static final class QuotaBean {
+ private final double cpu;
+ private final long ramMb;
+ private final long diskMb;
+
+ private QuotaBean(double cpu, long ramMb, long diskMb) {
+ this.cpu = cpu;
+ this.ramMb = ramMb;
+ this.diskMb = diskMb;
+ }
+
+ @JsonProperty("cpu_cores")
+ public double getCpu() {
+ return cpu;
+ }
+
+ @JsonProperty("ram_mb")
+ public long getRamMb() {
+ return ramMb;
+ }
+
+ @JsonProperty("disk_mb")
+ public long getDiskMb() {
+ return diskMb;
+ }
+ }
+}