You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:41 UTC
[48/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
deleted file mode 100644
index 0ad9e13..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.BindingAnnotation;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.state.TaskAssigner;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.LOST;
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-
-/**
- * Enables scheduling and preemption of tasks.
- */
-interface TaskScheduler extends EventSubscriber {
-
- /**
- * Attempts to schedule a task, possibly performing irreversible actions.
- *
- * @param taskId The task to attempt to schedule.
- * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
- * again if TRY_AGAIN is returned.
- */
- TaskSchedulerResult schedule(String taskId);
-
- enum TaskSchedulerResult {
- SUCCESS,
- TRY_AGAIN
- }
-
- /**
- * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
- * backs off after a failed scheduling attempt.
- * <p>
- * Pending tasks are advertised to the scheduler via internal pubsub notifications.
- */
- class TaskSchedulerImpl implements TaskScheduler {
- /**
- * Binding annotation for the time duration of reservations
- */
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- @interface ReservationDuration { }
-
- private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
-
- private final Storage storage;
- private final StateManager stateManager;
- private final TaskAssigner assigner;
- private final OfferQueue offerQueue;
- private final Preemptor preemptor;
- private final Reservations reservations;
-
- private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
- private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
-
- @Inject
- TaskSchedulerImpl(
- Storage storage,
- StateManager stateManager,
- TaskAssigner assigner,
- OfferQueue offerQueue,
- Preemptor preemptor,
- @ReservationDuration Amount<Long, Time> reservationDuration,
- final Clock clock) {
-
- this.storage = checkNotNull(storage);
- this.stateManager = checkNotNull(stateManager);
- this.assigner = checkNotNull(assigner);
- this.offerQueue = checkNotNull(offerQueue);
- this.preemptor = checkNotNull(preemptor);
- this.reservations = new Reservations(reservationDuration, clock);
- }
-
- private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
- final String taskId,
- final IScheduledTask task) {
-
- return new Function<Offer, Optional<TaskInfo>>() {
- @Override public Optional<TaskInfo> apply(Offer offer) {
- Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
- if (reservedTaskId.isPresent()) {
- if (taskId.equals(reservedTaskId.get())) {
- // Slave is reserved to satisfy this task.
- return assigner.maybeAssign(offer, task);
- } else {
- // Slave is reserved for another task.
- return Optional.absent();
- }
- } else {
- // Slave is not reserved.
- return assigner.maybeAssign(offer, task);
- }
- }
- };
- }
-
- @VisibleForTesting
- static final Optional<String> LAUNCH_FAILED_MSG =
- Optional.of("Unknown exception attempting to schedule task.");
-
- @Timed("task_schedule_attempt")
- @Override
- public TaskSchedulerResult schedule(final String taskId) {
- scheduleAttemptsFired.incrementAndGet();
- try {
- return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
- @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
- LOG.fine("Attempting to schedule task " + taskId);
- Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
- final IScheduledTask task =
- Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
- if (task == null) {
- LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
- } else {
- try {
- if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
- // Task could not be scheduled.
- maybePreemptFor(taskId);
- return TaskSchedulerResult.TRY_AGAIN;
- }
- } catch (OfferQueue.LaunchException e) {
- LOG.log(Level.WARNING, "Failed to launch task.", e);
- scheduleAttemptsFailed.incrementAndGet();
-
- // The attempt to schedule the task failed, so we need to backpedal on the
- // assignment.
- // It is in the LOST state and a new task will move to PENDING to replace it.
- // Should the state change fail due to storage issues, that's okay. The task will
- // time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
- }
- }
-
- return TaskSchedulerResult.SUCCESS;
- }
- });
- } catch (RuntimeException e) {
- // We catch the generic unchecked exception here to ensure tasks are not abandoned
- // if there is a transient issue resulting in an unchecked exception.
- LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
- scheduleAttemptsFailed.incrementAndGet();
- return TaskSchedulerResult.TRY_AGAIN;
- }
- }
-
- private void maybePreemptFor(String taskId) {
- if (reservations.hasReservationForTask(taskId)) {
- return;
- }
- Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
- if (slaveId.isPresent()) {
- this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
- }
- }
-
- @Subscribe
- public void taskChanged(final TaskStateChange stateChangeEvent) {
- if (stateChangeEvent.getOldState() == PENDING) {
- reservations.invalidateTask(stateChangeEvent.getTaskId());
- }
- }
-
- private static class Reservations {
- private final Cache<SlaveID, String> reservations;
-
- Reservations(final Amount<Long, Time> duration, final Clock clock) {
- checkNotNull(duration);
- checkNotNull(clock);
- this.reservations = CacheBuilder.newBuilder()
- .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
- .ticker(new Ticker() {
- @Override public long read() {
- return clock.nowNanos();
- }
- })
- .build();
- Stats.export(new StatImpl<Long>("reservation_cache_size") {
- @Override public Long read() {
- return reservations.size();
- }
- });
- }
-
- private synchronized void add(SlaveID slaveId, String taskId) {
- reservations.put(slaveId, taskId);
- }
-
- private synchronized boolean hasReservationForTask(String taskId) {
- return reservations.asMap().containsValue(taskId);
- }
-
- private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) {
- return Optional.fromNullable(reservations.getIfPresent(slaveID));
- }
-
- private synchronized void invalidateTask(String taskId) {
- reservations.asMap().values().remove(taskId);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
deleted file mode 100644
index 19848c7..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.eventbus.Subscribe;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck
- * tasks will be transitioned to the LOST state.
- */
-class TaskTimeout implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
-
- @VisibleForTesting
- static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
-
- @VisibleForTesting
- static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
-
- @VisibleForTesting
- static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
-
- @VisibleForTesting
- static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
- ScheduleStatus.ASSIGNED,
- ScheduleStatus.PREEMPTING,
- ScheduleStatus.RESTARTING,
- ScheduleStatus.KILLING);
-
- @VisibleForTesting
- static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
-
- private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
-
- private static final class TimeoutKey {
- private final String taskId;
- private final ScheduleStatus status;
-
- private TimeoutKey(String taskId, ScheduleStatus status) {
- this.taskId = taskId;
- this.status = status;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(taskId, status);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TimeoutKey)) {
- return false;
- }
- TimeoutKey key = (TimeoutKey) o;
- return Objects.equal(taskId, key.taskId)
- && (status == key.status);
- }
-
- @Override
- public String toString() {
- return taskId + ":" + status;
- }
- }
-
- private final Storage storage;
- private final ScheduledExecutorService executor;
- private final StateManager stateManager;
- private final long timeoutMillis;
- private final Clock clock;
- private final AtomicLong timedOutTasks;
-
- @Inject
- TaskTimeout(
- Storage storage,
- ScheduledExecutorService executor,
- StateManager stateManager,
- final Clock clock,
- Amount<Long, Time> timeout,
- StatsProvider statsProvider) {
-
- this.storage = checkNotNull(storage);
- this.executor = checkNotNull(executor);
- this.stateManager = checkNotNull(stateManager);
- this.timeoutMillis = timeout.as(Time.MILLISECONDS);
- this.clock = checkNotNull(clock);
- this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-
- exportStats(statsProvider);
- }
-
- private void registerTimeout(TimeoutKey key) {
- // This is an obvious check-then-act, but:
- // - there isn't much of a better option, given that we have to get the Future before
- // inserting into the map
- // - a key collision only happens in practice if something is wrong externally to this class
- // (double event for the same state)
- // - the outcome is low-risk, we would wind up with a redundant Future that will eventually
- // no-op
- if (!futures.containsKey(key)) {
- Future<?> timeoutHandler = executor.schedule(
- new TimedOutTaskHandler(key),
- timeoutMillis,
- TimeUnit.MILLISECONDS);
- futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
- }
- }
-
- private static boolean isTransient(ScheduleStatus status) {
- return TRANSIENT_STATES.contains(status);
- }
-
- @Subscribe
- public void recordStateChange(TaskStateChange change) {
- String taskId = change.getTaskId();
- ScheduleStatus newState = change.getNewState();
- if (isTransient(change.getOldState())) {
- TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
- Context context = futures.remove(oldKey);
- if (context != null) {
- LOG.fine("Canceling state timeout for task " + oldKey);
- context.future.cancel(false);
- }
- }
-
- if (isTransient(newState)) {
- registerTimeout(new TimeoutKey(taskId, change.getNewState()));
- }
- }
-
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
- registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
- }
- }
-
- private class TimedOutTaskHandler implements Runnable {
- private final TimeoutKey key;
-
- TimedOutTaskHandler(TimeoutKey key) {
- this.key = key;
- }
-
- @Override public void run() {
- Context context = futures.get(key);
- try {
- if (context == null) {
- LOG.warning("Timeout context not found for " + key);
- return;
- }
-
- LOG.info("Timeout reached for task " + key);
- // This query acts as a CAS by including the state that we expect the task to be in if the
- // timeout is still valid. Ideally, the future would have already been canceled, but in the
- // event of a state transition race, including transientState prevents an unintended
- // task timeout.
- Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
- // Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
- timedOutTasks.incrementAndGet();
- } else {
- LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
- }
- } finally {
- futures.remove(key);
- }
- }
- }
-
- private class Context {
- private final long timestampMillis;
- private final Future<?> future;
-
- Context(long timestampMillis, Future<?> future) {
- this.timestampMillis = timestampMillis;
- this.future = future;
- }
- }
-
- private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
- @Override public Long apply(Context context) {
- return context.timestampMillis;
- }
- };
-
- private static final Ordering<Context> TIMESTAMP_ORDER =
- Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
-
- @VisibleForTesting
- static String waitingTimeStatName(ScheduleStatus status) {
- return "scheduler_max_" + status + "_waiting_ms";
- }
-
- private void exportStats(StatsProvider statsProvider) {
- statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
- @Override public Number get() {
- return futures.size();
- }
- });
-
- for (final ScheduleStatus status : TRANSIENT_STATES) {
- statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
- private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
- @Override public boolean apply(TimeoutKey key) {
- return key.status == status;
- }
- };
-
- @Override public Number get() {
- Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
- if (Iterables.isEmpty(matches)) {
- return 0L;
- } else {
- return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
- }
- }
- });
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java b/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
deleted file mode 100644
index b11c683..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-
-import com.twitter.common.base.MorePreconditions;
-
-/**
- * Utility class for constructing {@link CommandInfo} objects given an executor URI.
- */
-public final class CommandUtil {
-
- private CommandUtil() {
- // Utility class.
- }
-
- private static String uriBasename(String uri) {
- int lastSlash = uri.lastIndexOf("/");
- if (lastSlash == -1) {
- return uri;
- } else {
- String basename = uri.substring(lastSlash + 1);
- MorePreconditions.checkNotBlank(basename, "URI must not end with a slash.");
-
- return basename;
- }
- }
-
- /**
- * Creates a description of a command that will fetch and execute the given URI to an executor
- * binary.
- *
- * @param executorUri URI to the executor.
- * @return A command that will fetch and execute the executor.
- */
- public static CommandInfo create(String executorUri) {
- MorePreconditions.checkNotBlank(executorUri);
-
- return CommandInfo.newBuilder()
- .addUris(URI.newBuilder().setValue(executorUri).setExecutable(true))
- .setValue("./" + uriBasename(executorUri))
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java b/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
deleted file mode 100644
index 2f84b5c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.TaskState;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.ScheduleStatus;
-
-/**
- * Collection of utility functions to convert mesos protobuf types to internal thrift types.
- */
-public final class Conversions {
-
- private static final Logger LOG = Logger.getLogger(Conversions.class.getName());
-
- private Conversions() {
- // Utility class.
- }
-
- // Maps from mesos state to scheduler interface state.
- private static final Map<TaskState, ScheduleStatus> STATE_TRANSLATION =
- new ImmutableMap.Builder<TaskState, ScheduleStatus>()
- .put(TaskState.TASK_STARTING, ScheduleStatus.STARTING)
- .put(TaskState.TASK_RUNNING, ScheduleStatus.RUNNING)
- .put(TaskState.TASK_FINISHED, ScheduleStatus.FINISHED)
- .put(TaskState.TASK_FAILED, ScheduleStatus.FAILED)
- .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED)
- .put(TaskState.TASK_LOST, ScheduleStatus.LOST)
- .build();
-
- /**
- * Converts a protobuf state to an internal schedule status.
- *
- * @param taskState Protobuf state.
- * @return Equivalent thrift-generated state.
- */
- public static ScheduleStatus convertProtoState(TaskState taskState) {
- ScheduleStatus status = STATE_TRANSLATION.get(taskState);
- Preconditions.checkArgument(status != null, "Unrecognized task state " + taskState);
- return status;
- }
-
- private static final Function<Protos.Attribute, String> ATTRIBUTE_NAME =
- new Function<Protos.Attribute, String>() {
- @Override public String apply(Protos.Attribute attr) {
- return attr.getName();
- }
- };
-
- /**
- * Typedef to make anonymous implementation more concise.
- */
- private abstract static class AttributeConverter
- implements Function<Entry<String, Collection<Protos.Attribute>>, Attribute> {
- }
-
- private static final Function<Protos.Attribute, String> VALUE_CONVERTER =
- new Function<Protos.Attribute, String>() {
- @Override public String apply(Protos.Attribute attribute) {
- switch (attribute.getType()) {
- case SCALAR:
- return String.valueOf(attribute.getScalar().getValue());
-
- case TEXT:
- return attribute.getText().getValue();
-
- default:
- LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring.");
- return null;
- }
- }
- };
-
- private static final AttributeConverter ATTRIBUTE_CONVERTER = new AttributeConverter() {
- @Override public Attribute apply(Entry<String, Collection<Protos.Attribute>> entry) {
- // Convert values and filter any that were ignored.
- return new Attribute(
- entry.getKey(),
- FluentIterable.from(entry.getValue())
- .transform(VALUE_CONVERTER)
- .filter(Predicates.notNull())
- .toSet());
- }
- };
-
- /**
- * Converts protobuf attributes into thrift-generated attributes.
- *
- * @param offer Resource offer.
- * @return Equivalent thrift host attributes.
- */
- public static HostAttributes getAttributes(Offer offer) {
- // Group by attribute name.
- Multimap<String, Protos.Attribute> valuesByName =
- Multimaps.index(offer.getAttributesList(), ATTRIBUTE_NAME);
-
- // TODO(William Farner): Include slave id.
- return new HostAttributes(
- offer.getHostname(),
- FluentIterable.from(valuesByName.asMap().entrySet())
- .transform(ATTRIBUTE_CONVERTER)
- .toSet())
- .setSlaveId(offer.getSlaveId().getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java b/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
deleted file mode 100644
index 008e1cb..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-
-import com.twitter.aurora.gen.JobKey;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Utility class providing convenience functions relating to JobKeys.
- */
-public final class JobKeys {
- private JobKeys() {
- // Utility class.
- }
-
- public static final Function<IJobConfiguration, IJobKey> FROM_CONFIG =
- new Function<IJobConfiguration, IJobKey>() {
- @Override public IJobKey apply(IJobConfiguration job) {
- return job.getKey();
- }
- };
-
- public static final Function<IJobKey, String> TO_ROLE =
- new Function<IJobKey, String>() {
- @Override public String apply(IJobKey jobKey) {
- return jobKey.getRole();
- }
- };
-
- public static final Function<IJobKey, String> TO_ENVIRONMENT =
- new Function<IJobKey, String>() {
- @Override public String apply(IJobKey jobKey) {
- return jobKey.getEnvironment();
- }
- };
-
- public static final Function<IJobKey, String> TO_JOB_NAME =
- new Function<IJobKey, String>() {
- @Override public String apply(IJobKey jobKey) {
- return jobKey.getName();
- }
- };
-
- public static final Function<IJobConfiguration, String> CONFIG_TO_ROLE =
- Functions.compose(TO_ROLE, FROM_CONFIG);
-
- /**
- * Check that a jobKey struct is valid.
- *
- * @param jobKey The jobKey to validate.
- * @return {@code true} if the jobKey validates.
- */
- public static boolean isValid(@Nullable IJobKey jobKey) {
- return jobKey != null
- && !Strings.isNullOrEmpty(jobKey.getRole())
- && !Strings.isNullOrEmpty(jobKey.getEnvironment())
- && !Strings.isNullOrEmpty(jobKey.getName());
- }
-
- /**
- * Assert that a jobKey struct is valid.
- *
- * @param jobKey The key struct to validate.
- * @return The validated jobKey argument.
- * @throws IllegalArgumentException if the key struct fails to validate.
- */
- public static IJobKey assertValid(IJobKey jobKey) throws IllegalArgumentException {
- checkArgument(isValid(jobKey));
-
- return jobKey;
- }
-
- /**
- * Attempt to create a valid JobKey from the given (role, environment, name) triple.
- *
- * @param role The job's role.
- * @param environment The job's environment.
- * @param name The job's name.
- * @return A valid JobKey if it can be created.
- * @throws IllegalArgumentException if the key fails to validate.
- */
- public static IJobKey from(String role, String environment, String name)
- throws IllegalArgumentException {
-
- IJobKey job = IJobKey.build(new JobKey()
- .setRole(role)
- .setEnvironment(environment)
- .setName(name));
- return assertValid(job);
- }
-
- /**
- * Attempts to create a valid JobKey from the given task.
- *
- * @param task The task to create job key from.
- * @return A valid JobKey if it can be created.
- * @throws IllegalArgumentException if the key fails to validate.
- */
- public static IJobKey from(ITaskConfig task) throws IllegalArgumentException {
- return from(task.getOwner().getRole(), task.getEnvironment(), task.getJobName());
- }
-
- /**
- * Create a "/"-delimited String representation of a job key, suitable for logging but not
- * necessarily suitable for use as a unique identifier.
- *
- * @param jobKey Key to represent.
- * @return "/"-delimited representation of the key.
- */
- public static String toPath(IJobKey jobKey) {
- return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
- }
-
- /**
- * Create a "/"-delimited String representation of job key, suitable for logging but not
- * necessarily suitable for use as a unique identifier.
- *
- * @param job Job to represent.
- * @return "/"-delimited representation of the job's key.
- */
- public static String toPath(IJobConfiguration job) {
- return toPath(job.getKey());
- }
-
- /**
- * Attempt to extract a job key from the given query if it is scoped to a single job.
- *
- * @param query Query to extract the key from.
- * @return A present if one can be extracted, absent otherwise.
- */
- public static Optional<IJobKey> from(Query.Builder query) {
- if (Query.isJobScoped(query)) {
- TaskQuery taskQuery = query.get();
- return Optional.of(
- from(taskQuery.getOwner().getRole(), taskQuery.getEnvironment(), taskQuery.getJobName()));
-
- } else {
- return Optional.absent();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java b/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
deleted file mode 100644
index 74b5e0b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-/**
- * Utility class for working with numbers.
- */
-public final class Numbers {
-
- private Numbers() {
- // Utility class.
- }
-
- /**
- * Converts a set of integers into a set of contiguous closed ranges that equally represent the
- * input integers.
- * <p>
- * The resulting ranges will be in ascending order.
- *
- * @param values Values to transform to ranges.
- * @return Closed ranges with identical members to the input set.
- */
- public static Set<Range<Integer>> toRanges(Iterable<Integer> values) {
- ImmutableSet.Builder<Range<Integer>> builder = ImmutableSet.builder();
-
- PeekingIterator<Integer> iterator =
- Iterators.peekingIterator(Sets.newTreeSet(values).iterator());
-
- // Build ranges until there are no numbers left.
- while (iterator.hasNext()) {
- // Start a new range.
- int start = iterator.next();
- int end = start;
- // Increment the end until the range is non-contiguous.
- while (iterator.hasNext() && (iterator.peek() == (end + 1))) {
- end++;
- iterator.next();
- }
-
- builder.add(Range.closed(start, end));
- }
-
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Query.java b/src/main/java/com/twitter/aurora/scheduler/base/Query.java
deleted file mode 100644
index d02ef87..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Query.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.EnumSet;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.primitives.Ints;
-
-import com.twitter.aurora.gen.Identity;
-import com.twitter.aurora.gen.InstanceKey;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.commons.lang.StringUtils.isEmpty;
-
-/**
- * A utility class to construct storage queries.
- * TODO(Sathya): Add some basic unit tests for isJobScoped and isOnlyJobScoped.
- */
-public final class Query {
-
- private Query() {
- // Utility.
- }
-
- /**
- * Checks whether a query is scoped to a specific job.
- * A query scoped to a job specifies a role and job name.
- *
- * @param taskQuery Query to test.
- * @return {@code true} if the query specifies at least a role and job name,
- * otherwise {@code false}.
- */
- public static boolean isJobScoped(Builder taskQuery) {
- TaskQuery query = taskQuery.get();
- return (query.getOwner() != null)
- && !isEmpty(query.getOwner().getRole())
- && !isEmpty(query.getEnvironment())
- && !isEmpty(query.getJobName());
- }
-
- /**
- * Checks whether a query is strictly scoped to a specific job. A query is strictly job scoped,
- * iff it has the role, environment and jobName set.
- *
- * @param query Query to test.
- * @return {@code true} if the query is strictly job scoped, otherwise {@code false}.
- */
- public static boolean isOnlyJobScoped(Builder query) {
- Optional<IJobKey> jobKey = JobKeys.from(query);
- return jobKey.isPresent() && Query.jobScoped(jobKey.get()).equals(query);
- }
-
- public static Builder arbitrary(TaskQuery query) {
- return new Builder(query.deepCopy());
- }
-
- public static Builder unscoped() {
- return new Builder();
- }
-
- public static Builder roleScoped(String role) {
- return unscoped().byRole(role);
- }
-
- public static Builder envScoped(String role, String environment) {
- return unscoped().byEnv(role, environment);
- }
-
- public static Builder jobScoped(IJobKey jobKey) {
- return unscoped().byJob(jobKey);
- }
-
- public static Builder instanceScoped(InstanceKey instanceKey) {
- return instanceScoped(IJobKey.build(instanceKey.getJobKey()), instanceKey.getInstanceId());
- }
-
- public static Builder instanceScoped(IJobKey jobKey, int instanceId, int... instanceIds) {
- return unscoped().byInstances(jobKey, instanceId, instanceIds);
- }
-
- public static Builder instanceScoped(IJobKey jobKey, Iterable<Integer> instanceIds) {
- return unscoped().byInstances(jobKey, instanceIds);
- }
-
- public static Builder taskScoped(String taskId, String... taskIds) {
- return unscoped().byId(taskId, taskIds);
- }
-
- public static Builder taskScoped(Iterable<String> taskIds) {
- return unscoped().byId(taskIds);
- }
-
- public static Builder slaveScoped(String slaveHost) {
- return unscoped().bySlave(slaveHost);
- }
-
- public static Builder statusScoped(ScheduleStatus status, ScheduleStatus... statuses) {
- return unscoped().byStatus(status, statuses);
- }
-
- public static Builder statusScoped(Iterable<ScheduleStatus> statuses) {
- return unscoped().byStatus(statuses);
- }
-
- /**
- * A Builder of TaskQueries. Builders are immutable and provide access to a set of convenience
- * methods to return a new builder of another scope. Available scope filters include slave,
- * taskId, role, jobs of a role, and instances of a job.
- *
- * <p>
- * This class does not expose the full functionality of TaskQuery but rather subsets of it that
- * can be efficiently executed and make sense in the context of the scheduler datastores. This
- * builder should be preferred over constructing TaskQueries directly.
- * </p>
- *
- * TODO(ksweeney): Add an environment scope.
- */
- public static final class Builder implements Supplier<TaskQuery> {
- private final TaskQuery query;
-
- private Builder() {
- this.query = new TaskQuery();
- }
-
- private Builder(final TaskQuery query) {
- this.query = checkNotNull(query); // It is expected that the caller calls deepCopy.
- }
-
- /**
- * Build a query that is the combination of all the filters applied to a Builder. Mutating the
- * returned object will not affect the state of the builder. Can be called any number of times
- * and will return a new {@code TaskQuery} each time.
- *
- * @return A new TaskQuery satisfying this builder's constraints.
- */
- @Override
- public TaskQuery get() {
- return query.deepCopy();
- }
-
- @Override
- public boolean equals(Object that) {
- return that != null
- && that instanceof Builder
- && Objects.equal(query, ((Builder) that).query);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(query);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("query", query)
- .toString();
- }
-
- /**
- * Create a builder scoped to tasks.
- *
- * @param taskId An ID of a task to scope the builder to.
- * @param taskIds Additional IDs of tasks to scope the builder to (they are ORed together).
- * @return A new Builder scoped to the given tasks.
- */
- public Builder byId(String taskId, String... taskIds) {
- checkNotNull(taskId);
-
- return new Builder(
- query.deepCopy()
- .setTaskIds(ImmutableSet.<String>builder().add(taskId).add(taskIds).build()));
- }
-
- /**
- * Create a builder scoped to tasks.
- *
- * @see #byId(String, String...)
- *
- * @param taskIds The IDs of the tasks to scope the query to (ORed together).
- * @return A new Builder scoped to the given tasks.
- */
- public Builder byId(Iterable<String> taskIds) {
- checkNotNull(taskIds);
-
- return new Builder(
- query.deepCopy().setTaskIds(ImmutableSet.copyOf(taskIds)));
- }
-
- /**
- * Create a builder scoped to a role. A role scope conflicts with job and instance scopes.
- *
- * @param role The role to scope the query to.
- * @return A new Builder scoped to the given role.
- */
- public Builder byRole(String role) {
- checkNotNull(role);
-
- return new Builder(
- query.deepCopy().setOwner(new Identity().setRole(role)));
- }
-
- /**
- * Create a builder scoped to an environment. An environment scope conflicts with role, job,
- * and instance scopes.
- *
- * @param role The role to scope the query to.
- * @param environment The environment to scope the query to.
- * @return A new Builder scoped to the given environment.
- */
- public Builder byEnv(String role, String environment) {
- checkNotNull(role);
- checkNotNull(environment);
-
- return new Builder(
- query.deepCopy()
- .setOwner(new Identity().setRole(role))
- .setEnvironment(environment));
- }
-
- /**
- * Returns a new builder scoped to the job uniquely identified by the given key. A job scope
- * conflicts with role and instance scopes.
- *
- * @param jobKey The key of the job to scope the query to.
- * @return A new Builder scoped to the given jobKey.
- */
- public Builder byJob(IJobKey jobKey) {
- JobKeys.assertValid(jobKey);
-
- return new Builder(
- query.deepCopy()
- .setOwner(new Identity().setRole(jobKey.getRole()))
- .setEnvironment(jobKey.getEnvironment())
- .setJobName(jobKey.getName()));
- }
-
- /**
- * Returns a new builder scoped to the slave uniquely identified by the given slaveHost. A
- * builder can only be scoped to slaves once.
- *
- * @param slaveHost The hostname of the slave to scope the query to.
- * @return A new Builder scoped to the given slave.
- */
- public Builder bySlave(String slaveHost) {
- checkNotNull(slaveHost);
-
- return new Builder(query.deepCopy().setSlaveHost(slaveHost));
- }
-
- /**
- * Returns a new builder scoped to the given statuses. A builder can only be scoped to statuses
- * once.
- *
- * @param status The status to scope this Builder to.
- * @param statuses Additional statuses to scope this Builder to (they are ORed together).
- * @return A new Builder scoped to the given statuses.
- */
- public Builder byStatus(ScheduleStatus status, ScheduleStatus... statuses) {
- checkNotNull(status);
-
- return new Builder(
- query.deepCopy().setStatuses(EnumSet.of(status, statuses)));
- }
-
- /**
- * Create a new Builder scoped to statuses.
- *
- * @see Builder#byStatus(ScheduleStatus, ScheduleStatus...)
- *
- * @param statuses The statuses to scope this Builder to.
- * @return A new Builder scoped to the given statuses.
- */
- public Builder byStatus(Iterable<ScheduleStatus> statuses) {
- checkNotNull(statuses);
-
- return new Builder(
- query.deepCopy().setStatuses(EnumSet.copyOf(ImmutableSet.copyOf(statuses))));
- }
-
- /**
- * Returns a new Builder scoped to the given instances of the given job. A builder can only
- * be scoped to a set of instances, a job, or a role once.
- *
- * @param jobKey The key identifying the job.
- * @param instanceId An instance id of the target job.
- * @param instanceIds Additional instance ids of the target job.
- * @return A new Builder scoped to the given instance ids.
- */
- public Builder byInstances(IJobKey jobKey, int instanceId, int... instanceIds) {
- JobKeys.assertValid(jobKey);
-
- return new Builder(
- query.deepCopy()
- .setOwner(new Identity().setRole(jobKey.getRole()))
- .setEnvironment(jobKey.getEnvironment())
- .setJobName(jobKey.getName())
- .setInstanceIds(ImmutableSet.<Integer>builder()
- .add(instanceId)
- .addAll(Ints.asList(instanceIds))
- .build()));
- }
-
- /**
- * Create a new Builder scoped to instances.
- *
- * @see Builder#byInstances
- *
- * @param jobKey The key identifying the job.
- * @param instanceIds Instances of the target job.
- * @return A new Builder scoped to the given instance ids.
- */
- public Builder byInstances(IJobKey jobKey, Iterable<Integer> instanceIds) {
- JobKeys.assertValid(jobKey);
- checkNotNull(instanceIds);
-
- return new Builder(
- query.deepCopy()
- .setOwner(new Identity().setRole(jobKey.getRole()))
- .setEnvironment(jobKey.getEnvironment())
- .setJobName(jobKey.getName())
- .setInstanceIds(ImmutableSet.copyOf(instanceIds)));
- }
-
- /**
- * A convenience method to scope this builder to {@link Tasks#ACTIVE_STATES}.
- *
- * @return A new Builder scoped to Tasks#ACTIVE_STATES.
- */
- public Builder active() {
- return byStatus(Tasks.ACTIVE_STATES);
- }
-
- /**
- * A convenience method to scope this builder to {@link Tasks#TERMINAL_STATES}.
- *
- * @return A new Builder scoped to Tasks#TERMINAL_STATES.
- */
- public Builder terminal() {
- return byStatus(Tasks.TERMINAL_STATES);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java b/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
deleted file mode 100644
index 0420ee9..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-/**
- * Exception class to signal a failure to schedule a task or job.
- */
-public class ScheduleException extends Exception {
- public ScheduleException(String msg) {
- super(msg);
- }
-
- public ScheduleException(String msg, Throwable t) {
- super(msg, t);
- }
-
- public ScheduleException(Throwable t) {
- super(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java b/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
deleted file mode 100644
index a51c4e0..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-/**
- * Indicates some form of unexpected scheduler exception.
- */
-public class SchedulerException extends RuntimeException {
- public SchedulerException(String message) {
- super(message);
- }
- public SchedulerException(String message, Throwable cause) {
- super(message, cause);
- }
- public SchedulerException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java b/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
deleted file mode 100644
index d98da3f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.apiConstants;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Utility class providing convenience functions relating to tasks.
- */
-public final class Tasks {
-
- public static final Function<IScheduledTask, IAssignedTask> SCHEDULED_TO_ASSIGNED =
- new Function<IScheduledTask, IAssignedTask>() {
- @Override public IAssignedTask apply(IScheduledTask task) {
- return task.getAssignedTask();
- }
- };
-
- public static final Function<IAssignedTask, ITaskConfig> ASSIGNED_TO_INFO =
- new Function<IAssignedTask, ITaskConfig>() {
- @Override public ITaskConfig apply(IAssignedTask task) {
- return task.getTask();
- }
- };
-
- public static final Function<IScheduledTask, ITaskConfig> SCHEDULED_TO_INFO =
- Functions.compose(ASSIGNED_TO_INFO, SCHEDULED_TO_ASSIGNED);
-
- public static final Function<IAssignedTask, String> ASSIGNED_TO_ID =
- new Function<IAssignedTask, String>() {
- @Override public String apply(IAssignedTask task) {
- return task.getTaskId();
- }
- };
-
- public static final Function<IScheduledTask, String> SCHEDULED_TO_ID =
- Functions.compose(ASSIGNED_TO_ID, SCHEDULED_TO_ASSIGNED);
-
- public static final Function<IAssignedTask, Integer> ASSIGNED_TO_INSTANCE_ID =
- new Function<IAssignedTask, Integer>() {
- @Override public Integer apply(IAssignedTask task) {
- return task.getInstanceId();
- }
- };
-
- public static final Function<IScheduledTask, Integer> SCHEDULED_TO_INSTANCE_ID =
- Functions.compose(ASSIGNED_TO_INSTANCE_ID, SCHEDULED_TO_ASSIGNED);
-
- public static final Function<ITaskConfig, IJobKey> INFO_TO_JOB_KEY =
- new Function<ITaskConfig, IJobKey>() {
- @Override public IJobKey apply(ITaskConfig task) {
- return JobKeys.from(task);
- }
- };
-
- public static final Function<IAssignedTask, IJobKey> ASSIGNED_TO_JOB_KEY =
- Functions.compose(INFO_TO_JOB_KEY, ASSIGNED_TO_INFO);
-
- public static final Function<IScheduledTask, IJobKey> SCHEDULED_TO_JOB_KEY =
- Functions.compose(ASSIGNED_TO_JOB_KEY, SCHEDULED_TO_ASSIGNED);
-
- /**
- * Different states that an active task may be in.
- */
- public static final EnumSet<ScheduleStatus> ACTIVE_STATES =
- EnumSet.copyOf(apiConstants.ACTIVE_STATES);
-
- /**
- * Terminal states, which a task should not move from.
- */
- public static final Set<ScheduleStatus> TERMINAL_STATES =
- EnumSet.copyOf(apiConstants.TERMINAL_STATES);
-
- public static final Predicate<ITaskConfig> IS_PRODUCTION =
- new Predicate<ITaskConfig>() {
- @Override public boolean apply(ITaskConfig task) {
- return task.isProduction();
- }
- };
-
- public static final Function<IScheduledTask, ScheduleStatus> GET_STATUS =
- new Function<IScheduledTask, ScheduleStatus>() {
- @Override public ScheduleStatus apply(IScheduledTask task) {
- return task.getStatus();
- }
- };
-
- /**
- * Order by production flag (true, then false), subsorting by task ID.
- */
- public static final Ordering<IAssignedTask> SCHEDULING_ORDER =
- Ordering.explicit(true, false)
- .onResultOf(Functions.compose(Functions.forPredicate(IS_PRODUCTION), ASSIGNED_TO_INFO))
- .compound(Ordering.natural().onResultOf(ASSIGNED_TO_ID));
-
- private Tasks() {
- // Utility class.
- }
-
- /**
- * A utility method that returns a multi-map of tasks keyed by IJobKey.
- * @param tasks A list of tasks to be keyed by map
- * @return A multi-map of tasks keyed by job key.
- */
- public static Multimap<IJobKey, IScheduledTask> byJobKey(Iterable<IScheduledTask> tasks) {
- return Multimaps.index(tasks, Tasks.SCHEDULED_TO_JOB_KEY);
- }
-
- public static boolean isActive(ScheduleStatus status) {
- return ACTIVE_STATES.contains(status);
- }
-
- public static boolean isTerminated(ScheduleStatus status) {
- return TERMINAL_STATES.contains(status);
- }
-
- public static String id(IScheduledTask task) {
- return task.getAssignedTask().getTaskId();
- }
-
- // TODO(William Farner: Remove this once the code base is switched to IScheduledTask.
- public static String id(ScheduledTask task) {
- return task.getAssignedTask().getTaskId();
- }
-
- public static Set<String> ids(Iterable<IScheduledTask> tasks) {
- return ImmutableSet.copyOf(Iterables.transform(tasks, SCHEDULED_TO_ID));
- }
-
- public static Set<String> ids(IScheduledTask... tasks) {
- return ids(ImmutableList.copyOf(tasks));
- }
-
- public static Map<String, IScheduledTask> mapById(Iterable<IScheduledTask> tasks) {
- return Maps.uniqueIndex(tasks, SCHEDULED_TO_ID);
- }
-
- public static String getRole(IScheduledTask task) {
- return task.getAssignedTask().getTask().getOwner().getRole();
- }
-
- public static String getJob(IScheduledTask task) {
- return task.getAssignedTask().getTask().getJobName();
- }
-
- public static final Ordering<IScheduledTask> LATEST_ACTIVITY = Ordering.natural()
- .onResultOf(new Function<IScheduledTask, Long>() {
- @Override public Long apply(IScheduledTask task) {
- return Iterables.getLast(task.getTaskEvents()).getTimestamp();
- }
- });
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
deleted file mode 100644
index 4839d0f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.configuration;
-
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.gen.Constraint;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.LimitConstraint;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskConfig._Fields;
-import com.twitter.aurora.gen.TaskConstraint;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.storage.entities.IConstraint;
-import com.twitter.aurora.scheduler.storage.entities.IIdentity;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConstraint;
-import com.twitter.aurora.scheduler.storage.entities.IValueConstraint;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.MorePreconditions;
-
-import static com.twitter.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static com.twitter.aurora.gen.apiConstants.GOOD_IDENTIFIER_PATTERN_JVM;
-
-/**
- * Manages translation from a string-mapped configuration to a concrete configuration type, and
- * defaults for optional values.
- *
- * TODO(William Farner): Add input validation to all fields (strings not empty, positive ints, etc).
- */
-public final class ConfigurationManager {
-
- public static final String DEDICATED_ATTRIBUTE = "dedicated";
-
- @VisibleForTesting public static final String HOST_CONSTRAINT = "host";
- @VisibleForTesting public static final String RACK_CONSTRAINT = "rack";
-
- private static final Pattern GOOD_IDENTIFIER = Pattern.compile(GOOD_IDENTIFIER_PATTERN_JVM);
-
- private static final int MAX_IDENTIFIER_LENGTH = 255;
-
- private static class DefaultField implements Closure<TaskConfig> {
- private final _Fields field;
- private final Object defaultValue;
-
- DefaultField(_Fields field, Object defaultValue) {
- this.field = field;
- this.defaultValue = defaultValue;
- }
-
- @Override public void execute(TaskConfig task) {
- if (!task.isSet(field)) {
- task.setFieldValue(field, defaultValue);
- }
- }
- }
-
- private interface Validator<T> {
- void validate(T value) throws TaskDescriptionException;
- }
-
- private static class GreaterThan implements Validator<Number> {
- private final double min;
- private final String label;
-
- GreaterThan(double min, String label) {
- this.min = min;
- this.label = label;
- }
-
- @Override public void validate(Number value) throws TaskDescriptionException {
- if (this.min >= value.doubleValue()) {
- throw new TaskDescriptionException(label + " must be greater than " + this.min);
- }
- }
- }
-
- private static class RequiredFieldValidator<T> implements Validator<TaskConfig> {
- private final _Fields field;
- private final Validator<T> validator;
-
- RequiredFieldValidator(_Fields field, Validator<T> validator) {
- this.field = field;
- this.validator = validator;
- }
-
- public void validate(TaskConfig task) throws TaskDescriptionException {
- if (!task.isSet(field)) {
- throw new TaskDescriptionException("Field " + field.getFieldName() + " is required.");
- }
- @SuppressWarnings("unchecked")
- T value = (T) task.getFieldValue(field);
- validator.validate(value);
- }
- }
-
- private static final Iterable<Closure<TaskConfig>> DEFAULT_FIELD_POPULATORS =
- ImmutableList.of(
- new DefaultField(_Fields.IS_SERVICE, false),
- new DefaultField(_Fields.PRIORITY, 0),
- new DefaultField(_Fields.PRODUCTION, false),
- new DefaultField(_Fields.MAX_TASK_FAILURES, 1),
- new DefaultField(_Fields.TASK_LINKS, Maps.<String, String>newHashMap()),
- new DefaultField(_Fields.REQUESTED_PORTS, Sets.<String>newHashSet()),
- new DefaultField(_Fields.CONSTRAINTS, Sets.<Constraint>newHashSet()),
- new DefaultField(_Fields.ENVIRONMENT, DEFAULT_ENVIRONMENT),
- new Closure<TaskConfig>() {
- @Override public void execute(TaskConfig task) {
- if (!Iterables.any(task.getConstraints(), hasName(HOST_CONSTRAINT))) {
- task.addToConstraints(hostLimitConstraint(1));
- }
- }
- },
- new Closure<TaskConfig>() {
- @Override public void execute(TaskConfig task) {
- if (!isDedicated(ITaskConfig.build(task))
- && task.isProduction()
- && task.isIsService()
- && !Iterables.any(task.getConstraints(), hasName(RACK_CONSTRAINT))) {
-
- task.addToConstraints(rackLimitConstraint(1));
- }
- }
- });
-
- private static final Iterable<RequiredFieldValidator<?>> REQUIRED_FIELDS_VALIDATORS =
- ImmutableList.<RequiredFieldValidator<?>>of(
- new RequiredFieldValidator<>(_Fields.NUM_CPUS, new GreaterThan(0.0, "num_cpus")),
- new RequiredFieldValidator<>(_Fields.RAM_MB, new GreaterThan(0.0, "ram_mb")),
- new RequiredFieldValidator<>(_Fields.DISK_MB, new GreaterThan(0.0, "disk_mb")));
-
- private ConfigurationManager() {
- // Utility class.
- }
-
- @VisibleForTesting
- static boolean isGoodIdentifier(String identifier) {
- return GOOD_IDENTIFIER.matcher(identifier).matches()
- && (identifier.length() <= MAX_IDENTIFIER_LENGTH);
- }
-
- private static void checkNotNull(Object value, String error) throws TaskDescriptionException {
- if (value == null) {
- throw new TaskDescriptionException(error);
- }
- }
-
- private static void assertOwnerValidity(IIdentity jobOwner) throws TaskDescriptionException {
- checkNotNull(jobOwner, "No job owner specified!");
- checkNotNull(jobOwner.getRole(), "No job role specified!");
- checkNotNull(jobOwner.getUser(), "No job user specified!");
-
- if (!isGoodIdentifier(jobOwner.getRole())) {
- throw new TaskDescriptionException(
- "Job role contains illegal characters: " + jobOwner.getRole());
- }
-
- if (!isGoodIdentifier(jobOwner.getUser())) {
- throw new TaskDescriptionException(
- "Job user contains illegal characters: " + jobOwner.getUser());
- }
- }
-
- private static String getRole(IValueConstraint constraint) {
- return Iterables.getOnlyElement(constraint.getValues()).split("/")[0];
- }
-
- private static boolean isValueConstraint(ITaskConstraint taskConstraint) {
- return taskConstraint.getSetField() == TaskConstraint._Fields.VALUE;
- }
-
- public static boolean isDedicated(ITaskConfig task) {
- return Iterables.any(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE));
- }
-
- @Nullable
- private static IConstraint getDedicatedConstraint(ITaskConfig task) {
- return Iterables.find(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE), null);
- }
-
- /**
- * Check validity of and populates defaults in a job configuration. This will return a deep copy
- * of the provided job configuration with default configuration values applied, and configuration
- * map values sanitized and applied to their respective struct fields.
- *
- * @param job Job to validate and populate.
- * @return A deep copy of {@code job} that has been populated.
- * @throws TaskDescriptionException If the job configuration is invalid.
- */
- public static IJobConfiguration validateAndPopulate(IJobConfiguration job)
- throws TaskDescriptionException {
-
- Preconditions.checkNotNull(job);
-
- if (!job.isSetTaskConfig()) {
- throw new TaskDescriptionException("Job configuration must have taskConfig set.");
- }
-
- if (!job.isSetInstanceCount()) {
- throw new TaskDescriptionException("Job configuration does not have shardCount set.");
- }
-
- if (job.getInstanceCount() <= 0) {
- throw new TaskDescriptionException("Shard count must be positive.");
- }
-
- JobConfiguration builder = job.newBuilder();
-
- assertOwnerValidity(job.getOwner());
-
- if (!JobKeys.isValid(job.getKey())) {
- throw new TaskDescriptionException("Job key " + job.getKey() + " is invalid.");
- }
- if (!job.getKey().getRole().equals(job.getOwner().getRole())) {
- throw new TaskDescriptionException("Role in job key must match job owner.");
- }
- if (!isGoodIdentifier(job.getKey().getRole())) {
- throw new TaskDescriptionException(
- "Job role contains illegal characters: " + job.getKey().getRole());
- }
- if (!isGoodIdentifier(job.getKey().getEnvironment())) {
- throw new TaskDescriptionException(
- "Job environment contains illegal characters: " + job.getKey().getEnvironment());
- }
- if (!isGoodIdentifier(job.getKey().getName())) {
- throw new TaskDescriptionException(
- "Job name contains illegal characters: " + job.getKey().getName());
- }
-
- builder.setTaskConfig(
- validateAndPopulate(ITaskConfig.build(builder.getTaskConfig())).newBuilder());
-
- // Only one of [service=true, cron_schedule] may be set.
- if (!StringUtils.isEmpty(job.getCronSchedule()) && builder.getTaskConfig().isIsService()) {
- throw new TaskDescriptionException(
- "A service task may not be run on a cron schedule: " + builder);
- }
-
- return IJobConfiguration.build(builder);
- }
-
- /**
- * Check validity of and populates defaults in a task configuration. This will return a deep copy
- * of the provided task configuration with default configuration values applied, and configuration
- * map values sanitized and applied to their respective struct fields.
- *
- *
- * @param config Task config to validate and populate.
- * @return A reference to the modified {@code config} (for chaining).
- * @throws TaskDescriptionException If the task is invalid.
- */
- public static ITaskConfig validateAndPopulate(ITaskConfig config)
- throws TaskDescriptionException {
-
- TaskConfig builder = config.newBuilder();
-
- if (!builder.isSetRequestedPorts()) {
- builder.setRequestedPorts(ImmutableSet.<String>of());
- }
-
- maybeFillLinks(builder);
-
- assertOwnerValidity(config.getOwner());
-
- if (!isGoodIdentifier(config.getJobName())) {
- throw new TaskDescriptionException(
- "Job name contains illegal characters: " + config.getJobName());
- }
-
- if (!isGoodIdentifier(config.getEnvironment())) {
- throw new TaskDescriptionException(
- "Environment contains illegal characters: " + config.getEnvironment());
- }
-
- if (!builder.isSetExecutorConfig()) {
- throw new TaskDescriptionException("Configuration may not be null");
- }
-
- // Maximize the usefulness of any thrown error message by checking required fields first.
- for (RequiredFieldValidator<?> validator : REQUIRED_FIELDS_VALIDATORS) {
- validator.validate(builder);
- }
-
- IConstraint constraint = getDedicatedConstraint(config);
- if (constraint != null) {
- if (!isValueConstraint(constraint.getConstraint())) {
- throw new TaskDescriptionException("A dedicated constraint must be of value type.");
- }
-
- IValueConstraint valueConstraint = constraint.getConstraint().getValue();
-
- if (!(valueConstraint.getValues().size() == 1)) {
- throw new TaskDescriptionException("A dedicated constraint must have exactly one value");
- }
-
- String dedicatedRole = getRole(valueConstraint);
- if (!config.getOwner().getRole().equals(dedicatedRole)) {
- throw new TaskDescriptionException(
- "Only " + dedicatedRole + " may use hosts dedicated for that role.");
- }
- }
-
- return ITaskConfig.build(applyDefaultsIfUnset(builder));
- }
-
- /**
- * Provides a filter for the given constraint name.
- *
- * @param name The name of the constraint.
- * @return A filter that matches the constraint.
- */
- public static Predicate<IConstraint> getConstraintByName(final String name) {
- return new Predicate<IConstraint>() {
- @Override public boolean apply(IConstraint constraint) {
- return constraint.getName().equals(name);
- }
- };
- }
-
- @VisibleForTesting
- public static Constraint hostLimitConstraint(int limit) {
- return new Constraint(HOST_CONSTRAINT, TaskConstraint.limit(new LimitConstraint(limit)));
- }
-
- @VisibleForTesting
- public static Constraint rackLimitConstraint(int limit) {
- return new Constraint(RACK_CONSTRAINT, TaskConstraint.limit(new LimitConstraint(limit)));
- }
-
- private static Predicate<Constraint> hasName(final String name) {
- MorePreconditions.checkNotBlank(name);
- return new Predicate<Constraint>() {
- @Override public boolean apply(Constraint constraint) {
- return name.equals(constraint.getName());
- }
- };
- }
-
- /**
- * Applies defaults to unset values in a task.
- *
- * @param task Task to apply defaults to.
- * @return A reference to the (modified) {@code task}.
- */
- @VisibleForTesting
- public static TaskConfig applyDefaultsIfUnset(TaskConfig task) {
- for (Closure<TaskConfig> populator : DEFAULT_FIELD_POPULATORS) {
- populator.execute(task);
- }
-
- return task;
- }
-
- /**
- * Applies defaults to unset values in a job and its tasks.
- *
- * @param job Job to apply defaults to.
- */
- @VisibleForTesting
- public static void applyDefaultsIfUnset(JobConfiguration job) {
- ConfigurationManager.applyDefaultsIfUnset(job.getTaskConfig());
- }
-
- private static void maybeFillLinks(TaskConfig task) {
- if (task.getTaskLinksSize() == 0) {
- ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
- if (task.getRequestedPorts().contains("health")) {
- links.put("health", "http://%host%:%port:health%");
- }
- if (task.getRequestedPorts().contains("http")) {
- links.put("http", "http://%host%:%port:http%");
- }
- task.setTaskLinks(links.build());
- }
- }
-
- /**
- * Thrown when an invalid task or job configuration is encountered.
- */
- public static class TaskDescriptionException extends Exception {
- public TaskDescriptionException(String msg) {
- super(msg);
- }
- }
-}