You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:43 UTC
[50/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java b/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
deleted file mode 100644
index 2443078..0000000
--- a/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.codec;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-/**
- * Codec that works for thrift objects.
- */
-public final class ThriftBinaryCodec {
-
- /**
- * Protocol factory used for all thrift encoding and decoding.
- */
- public static final TProtocolFactory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
-
- private ThriftBinaryCodec() {
- // Utility class.
- }
-
- /**
- * Identical to {@link #decodeNonNull(Class, byte[])}, but allows for a null buffer.
- *
- * @param clazz Class to instantiate and deserialize to.
- * @param buffer Buffer to decode.
- * @param <T> Target type.
- * @return A populated message, or {@code null} if the buffer was {@code null}.
- * @throws CodingException If the message could not be decoded.
- */
- @Nullable
- public static <T extends TBase<T, ?>> T decode(Class<T> clazz, @Nullable byte[] buffer)
- throws CodingException {
-
- if (buffer == null) {
- return null;
- }
- return decodeNonNull(clazz, buffer);
- }
-
- /**
- * Decodes a binary-encoded byte array into a target type.
- *
- * @param clazz Class to instantiate and deserialize to.
- * @param buffer Buffer to decode.
- * @param <T> Target type.
- * @return A populated message.
- * @throws CodingException If the message could not be decoded.
- */
- public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz, byte[] buffer)
- throws CodingException {
-
- Preconditions.checkNotNull(clazz);
- Preconditions.checkNotNull(buffer);
-
- try {
- T t = clazz.newInstance();
- new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
- return t;
- } catch (IllegalAccessException e) {
- throw new CodingException("Failed to access constructor for target type.", e);
- } catch (InstantiationException e) {
- throw new CodingException("Failed to instantiate target type.", e);
- } catch (TException e) {
- throw new CodingException("Failed to deserialize thrift object.", e);
- }
- }
-
- /**
- * Identical to {@link #encodeNonNull(TBase)}, but allows for a null input.
- *
- * @param tBase Object to encode.
- * @return Encoded object, or {@code null} if the argument was {@code null}.
- * @throws CodingException If the object could not be encoded.
- */
- @Nullable
- public static byte[] encode(@Nullable TBase<?, ?> tBase) throws CodingException {
- if (tBase == null) {
- return null;
- }
- return encodeNonNull(tBase);
- }
-
- /**
- * Encodes a thrift object into a binary array.
- *
- * @param tBase Object to encode.
- * @return Encoded object.
- * @throws CodingException If the object could not be encoded.
- */
- public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
- Preconditions.checkNotNull(tBase);
-
- try {
- return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
- } catch (TException e) {
- throw new CodingException("Failed to serialize: " + tBase, e);
- }
- }
-
- /**
- * Thrown when serialization or deserialization failed.
- */
- public static class CodingException extends Exception {
- public CodingException(String message) {
- super(message);
- }
- public CodingException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/Driver.java b/src/main/java/com/twitter/aurora/scheduler/Driver.java
deleted file mode 100644
index aa77887..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/Driver.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.StateMachine;
-
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-
-/**
- * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
- * <pre>
- * (run -> kill*)? -> stop*
- * </pre>
- *
- * Also ensures the driver is only asked for when needed.
- */
-public interface Driver {
-
- /**
- * Launches a task.
- *
- * @param offerId ID of the resource offer to accept with the task.
- * @param task Task to launch.
- */
- void launchTask(OfferID offerId, TaskInfo task);
-
- /**
- * Declines a resource offer.
- *
- * @param offerId ID of the offer to decline.
- */
- void declineOffer(OfferID offerId);
-
- /**
- * Sends a kill task request for the given {@code taskId} to the mesos master.
- *
- * @param taskId The id of the task to kill.
- */
- void killTask(String taskId);
-
- /**
- * Stops the underlying driver if it is running, otherwise does nothing.
- */
- void stop();
-
- /**
- * Starts the underlying driver. Can only be called once.
- *
- * @return The status of the underlying driver run request.
- */
- Protos.Status start();
-
- /**
- * Blocks until the underlying driver is stopped or aborted.
- *
- * @return The status of the underlying driver upon exit.
- */
- Protos.Status join();
-
- /**
- * Mesos driver implementation.
- */
- static class DriverImpl implements Driver {
- private static final Logger LOG = Logger.getLogger(Driver.class.getName());
-
- /**
- * Driver states.
- */
- enum State {
- INIT,
- RUNNING,
- STOPPED
- }
-
- private final StateMachine<State> stateMachine;
- private final Supplier<Optional<SchedulerDriver>> driverSupplier;
- private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
-
- /**
- * Creates a driver manager that will only ask for the underlying mesos driver when actually
- * needed.
- *
- * @param driverSupplier A factory for the underlying driver.
- */
- @Inject
- DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
- this.driverSupplier = driverSupplier;
- this.stateMachine =
- StateMachine.<State>builder("scheduler_driver")
- .initialState(State.INIT)
- .addState(State.INIT, State.RUNNING, State.STOPPED)
- .addState(State.RUNNING, State.STOPPED)
- .logTransitions()
- .throwOnBadTransition(true)
- .build();
- }
-
- private synchronized SchedulerDriver get(State expected) {
- // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
- stateMachine.checkState(expected);
- // This will and should fail if the driver is not present.
- return driverSupplier.get().get();
- }
-
- @Override
- public void launchTask(OfferID offerId, TaskInfo task) {
- get(State.RUNNING).launchTasks(offerId, ImmutableList.of(task));
- }
-
- @Override
- public void declineOffer(OfferID offerId) {
- get(State.RUNNING).declineOffer(offerId);
- }
-
- @Override
- public Protos.Status start() {
- SchedulerDriver driver = get(State.INIT);
- stateMachine.transition(State.RUNNING);
- return driver.start();
- }
-
- @Override
- public Status join() {
- return get(State.RUNNING).join();
- }
-
- @Override
- public synchronized void stop() {
- if (stateMachine.getState() == State.RUNNING) {
- SchedulerDriver driver = get(State.RUNNING);
- driver.stop(true /* failover */);
- stateMachine.transition(State.STOPPED);
- }
- }
-
- @Override
- public void killTask(String taskId) {
- SchedulerDriver driver = get(State.RUNNING);
- Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
-
- if (status != DRIVER_RUNNING) {
- LOG.severe(String.format("Attempt to kill task %s failed with code %s",
- taskId, status));
- killFailures.incrementAndGet();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java b/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
deleted file mode 100644
index e39bb09..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-import javax.inject.Provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.protobuf.ByteString;
-
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos.Credential;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Factory to create scheduler driver instances.
- */
-public interface DriverFactory extends Function<String, SchedulerDriver> {
-
- static class DriverFactoryImpl implements DriverFactory {
- private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
-
- @NotNull
- @CmdLine(name = "mesos_master_address",
- help = "Address for the mesos master, can be a socket address or zookeeper path.")
- private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
-
- @VisibleForTesting
- static final String PRINCIPAL_KEY = "aurora_authentication_principal";
- @VisibleForTesting
- static final String SECRET_KEY = "aurora_authentication_secret";
- @CmdLine(name = "framework_authentication_file",
- help = "Properties file which contains framework credentials to authenticate with Mesos"
- + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
- + "'" + SECRET_KEY + "'.")
- private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
-
- @CmdLine(name = "framework_failover_timeout",
- help = "Time after which a framework is considered deleted. SHOULD BE VERY HIGH.")
- private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
- Arg.create(Amount.of(21L, Time.DAYS));
-
- /**
- * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
- * attempt to write checkpoints when required by a task's framework. These checkpoints allow
- * executors to be reattached rather than killed when a slave is restarted.
- *
- * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
- * enabled.
- *
- * Behavior is as follows:
- * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=true):
- * Tasks will launch. Checkpoints will be written.
- * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=false):
- * Tasks WILL NOT launch.
- * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=true):
- * Tasks will launch. Checkpoints will not be written.
- * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=false):
- * Tasks will launch. Checkpoints will not be written.
- *
- * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
- * is resolved.
- */
- @CmdLine(name = "require_slave_checkpoint",
- help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
- + "slave restart should not kill executors, but the scheduler will not be able to "
- + "launch tasks on slaves without --checkpoint=true in their command lines. See "
- + "DriverFactory.java for more information.")
- private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
-
- private static final String EXECUTOR_USER = "root";
-
- private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
-
- private final Provider<Scheduler> scheduler;
-
- @Inject
- DriverFactoryImpl(Provider<Scheduler> scheduler) {
- this.scheduler = Preconditions.checkNotNull(scheduler);
- }
-
- @VisibleForTesting
- static Properties parseCredentials(InputStream credentialsStream) {
- Properties properties = new Properties();
- try {
- properties.load(credentialsStream);
- } catch (IOException e) {
- LOG.severe("Unable to load authentication file");
- throw Throwables.propagate(e);
- }
- Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
- "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
- Preconditions.checkState(properties.containsKey(SECRET_KEY),
- "The framework authentication file is missing the key: %s", SECRET_KEY);
- return properties;
- }
-
- @Override
- public SchedulerDriver apply(@Nullable String frameworkId) {
- LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
-
- FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
- .setUser(EXECUTOR_USER)
- .setName(TWITTER_FRAMEWORK_NAME)
- .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
- .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
-
- if (frameworkId != null) {
- LOG.info("Found persisted framework ID: " + frameworkId);
- frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
- } else {
- LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
- }
-
- if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
- Properties properties;
- try {
- properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
- } catch (FileNotFoundException e) {
- LOG.severe("Authentication File not Found");
- throw Throwables.propagate(e);
- }
-
- LOG.info(String.format("Connecting to master using authentication (principal: %s).",
- properties.get(PRINCIPAL_KEY)));
-
- Credential credential = Credential.newBuilder()
- .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
- .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
- .build();
-
- return new MesosSchedulerDriver(
- scheduler.get(),
- frameworkInfo.build(),
- MESOS_MASTER_ADDRESS.get(),
- credential);
- } else {
- LOG.warning("Connecting to master without authentication!");
- return new MesosSchedulerDriver(
- scheduler.get(),
- frameworkInfo.build(),
- MESOS_MASTER_ADDRESS.get());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
deleted file mode 100644
index fb41405..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.GuiceUtils.AllowUnchecked;
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.gen.comm.SchedulerMessage;
-import com.twitter.aurora.scheduler.base.Conversions;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Location for communication with mesos.
- */
-class MesosSchedulerImpl implements Scheduler {
- private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
-
- private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
- private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
- private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
- private final AtomicLong frameworkDisconnects =
- Stats.exportLong("scheduler_framework_disconnects");
- private final AtomicLong frameworkReregisters =
- Stats.exportLong("scheduler_framework_reregisters");
- private final AtomicLong lostExecutors = Stats.exportLong("scheduler_lost_executors");
-
- private final List<TaskLauncher> taskLaunchers;
-
- private final Storage storage;
- private final SchedulerCore schedulerCore;
- private final Lifecycle lifecycle;
- private volatile boolean registered = false;
-
- /**
- * Creates a new scheduler.
- *
- * @param schedulerCore Core scheduler.
- * @param lifecycle Application lifecycle manager.
- * @param taskLaunchers Task launchers.
- */
- @Inject
- public MesosSchedulerImpl(
- Storage storage,
- SchedulerCore schedulerCore,
- final Lifecycle lifecycle,
- List<TaskLauncher> taskLaunchers) {
-
- this.storage = checkNotNull(storage);
- this.schedulerCore = checkNotNull(schedulerCore);
- this.lifecycle = checkNotNull(lifecycle);
- this.taskLaunchers = checkNotNull(taskLaunchers);
- }
-
- @Override
- public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
- LOG.info("Received notification of lost slave: " + slaveId);
- }
-
- @SendNotification(after = Event.DriverRegistered)
- @Override
- public void registered(
- SchedulerDriver driver,
- final FrameworkID frameworkId,
- MasterInfo masterInfo) {
-
- LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
- }
- });
- registered = true;
- }
-
- @SendNotification(after = Event.DriverDisconnected)
- @Override
- public void disconnected(SchedulerDriver schedulerDriver) {
- LOG.warning("Framework disconnected.");
- frameworkDisconnects.incrementAndGet();
- }
-
- @Override
- public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
- LOG.info("Framework re-registered with master " + masterInfo);
- frameworkReregisters.incrementAndGet();
- }
-
- private static boolean fitsInOffer(TaskInfo task, Offer offer) {
- return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
- }
-
- @Timed("scheduler_resource_offers")
- @Override
- public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
- Preconditions.checkState(registered, "Must be registered before receiving offers.");
-
- for (final Offer offer : offers) {
- log(Level.FINE, "Received offer: %s", offer);
- resourceOffers.incrementAndGet();
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getAttributeStore().saveHostAttributes(Conversions.getAttributes(offer));
- }
- });
-
- // Ordering of task launchers is important here, since offers are consumed greedily.
- // TODO(William Farner): Refactor this area of code now that the primary task launcher
- // is asynchronous.
- for (TaskLauncher launcher : taskLaunchers) {
- Optional<TaskInfo> task = Optional.absent();
- try {
- task = launcher.createTask(offer);
- } catch (SchedulerException e) {
- LOG.log(Level.WARNING, "Failed to schedule offers.", e);
- failedOffers.incrementAndGet();
- }
-
- if (task.isPresent()) {
- if (fitsInOffer(task.get(), offer)) {
- driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
- break;
- } else {
- LOG.warning("Insufficient resources to launch task " + task);
- }
- }
- }
- }
- }
-
- @Override
- public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
- LOG.info("Offer rescinded: " + offerId);
- for (TaskLauncher launcher : taskLaunchers) {
- launcher.cancelOffer(offerId);
- }
- }
-
- @AllowUnchecked
- @Timed("scheduler_status_update")
- @Override
- public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
- String info = status.hasData() ? status.getData().toStringUtf8() : null;
- String infoMsg = info != null ? " with info " + info : "";
- String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
- LOG.info("Received status update for task " + status.getTaskId().getValue()
- + " in state " + status.getState() + infoMsg + coreMsg);
-
- try {
- for (TaskLauncher launcher : taskLaunchers) {
- if (launcher.statusUpdate(status)) {
- return;
- }
- }
- } catch (SchedulerException e) {
- LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
- // We re-throw the exception here in an effort to NACK the status update and trigger an
- // abort of the driver. Previously we directly issued driver.abort(), but the re-entrancy
- // guarantees of that are uncertain (and we believe it was not working). However, this
- // was difficult to discern since logging is unreliable during JVM shutdown and we would not
- // see the above log message.
- throw e;
- }
-
- LOG.warning("Unhandled status update " + status);
- failedStatusUpdates.incrementAndGet();
- }
-
- @Override
- public void error(SchedulerDriver driver, String message) {
- LOG.severe("Received error message: " + message);
- lifecycle.shutdown();
- }
-
- @Override
- public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
- int status) {
-
- LOG.info("Lost executor " + executorID);
- lostExecutors.incrementAndGet();
- }
-
- @Timed("scheduler_framework_message")
- @Override
- public void frameworkMessage(SchedulerDriver driver, ExecutorID executor, SlaveID slave,
- byte[] data) {
-
- if (data == null) {
- LOG.info("Received empty framework message.");
- return;
- }
-
- try {
- SchedulerMessage schedulerMsg = ThriftBinaryCodec.decode(SchedulerMessage.class, data);
- if (schedulerMsg == null || !schedulerMsg.isSet()) {
- LOG.warning("Received empty scheduler message.");
- return;
- }
-
- switch (schedulerMsg.getSetField()) {
- case DELETED_TASKS:
- // TODO(William Farner): Refactor this to use a thinner interface here. As it stands
- // it is odd that we route the registered() call to schedulerCore via the
- // registeredListener and call the schedulerCore directly here.
- schedulerCore.tasksDeleted(schedulerMsg.getDeletedTasks().getTaskIds());
- break;
-
- default:
- LOG.warning("Received unhandled scheduler message type: " + schedulerMsg.getSetField());
- break;
- }
- } catch (ThriftBinaryCodec.CodingException e) {
- LOG.log(Level.SEVERE, "Failed to decode framework message.", e);
- }
- }
-
- private static void log(Level level, String message, Object... args) {
- if (LOG.isLoggable(level)) {
- LOG.log(level, String.format(message, args));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java b/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
deleted file mode 100644
index 5f73f71..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.Protobufs;
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.scheduler.base.CommandUtil;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.quantity.Data;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * A factory to create mesos task objects.
- */
-public interface MesosTaskFactory {
-
- /**
- * Creates a mesos task object.
- *
- * @param task Assigned task to translate into a task object.
- * @param slaveId Id of the slave the task is being assigned to.
- * @return A new task.
- * @throws SchedulerException If the task could not be encoded.
- */
- TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
-
- static class ExecutorConfig {
- private final String executorPath;
-
- public ExecutorConfig(String executorPath) {
- this.executorPath = checkNotBlank(executorPath);
- }
-
- String getExecutorPath() {
- return executorPath;
- }
- }
-
- static class MesosTaskFactoryImpl implements MesosTaskFactory {
- private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
- private static final String EXECUTOR_PREFIX = "thermos-";
-
- /**
- * Name to associate with task executors.
- */
- @VisibleForTesting
- static final String EXECUTOR_NAME = "aurora.task";
-
- private final String executorPath;
-
- @Inject
- MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
- this.executorPath = executorConfig.getExecutorPath();
- }
-
- @VisibleForTesting
- static ExecutorID getExecutorId(String taskId) {
- return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
- }
-
- public static String getJobSourceName(IJobKey jobkey) {
- return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
- }
-
- public static String getJobSourceName(ITaskConfig task) {
- return getJobSourceName(JobKeys.from(task));
- }
-
- public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
- return String.format("%s.%s", getJobSourceName(task), instanceId);
- }
-
- @Override
- public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
- checkNotNull(task);
- byte[] taskInBytes;
- try {
- taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
- } catch (ThriftBinaryCodec.CodingException e) {
- LOG.log(Level.SEVERE, "Unable to serialize task.", e);
- throw new SchedulerException("Internal error.", e);
- }
-
- ITaskConfig config = task.getTask();
- List<Resource> resources;
- if (task.isSetAssignedPorts()) {
- resources = Resources.from(config)
- .toResourceList(ImmutableSet.copyOf(task.getAssignedPorts().values()));
- } else {
- resources = ImmutableList.of();
- }
-
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Setting task resources to "
- + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
- }
- TaskInfo.Builder taskBuilder =
- TaskInfo.newBuilder()
- .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
- .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
- .setSlaveId(slaveId)
- .addAllResources(resources)
- .setData(ByteString.copyFrom(taskInBytes));
-
- ExecutorInfo executor = ExecutorInfo.newBuilder()
- .setCommand(CommandUtil.create(executorPath))
- .setExecutorId(getExecutorId(task.getTaskId()))
- .setName(EXECUTOR_NAME)
- .setSource(getInstanceSourceName(config, task.getInstanceId()))
- .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS))
- .addResources(
- Resources.makeMesosResource(Resources.RAM_MB, ResourceSlot.EXECUTOR_RAM.as(Data.MB)))
- .build();
- return taskBuilder
- .setExecutor(executor)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java b/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
deleted file mode 100644
index a9c14e6..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.Arrays;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-
-import static org.apache.mesos.Protos.Offer;
-
-/**
- * Resource containing class that is aware of executor overhead.
- */
-public final class ResourceSlot {
-
- private final Resources resources;
-
- /**
- * CPU allocated for each executor.
- */
- @VisibleForTesting
- static final double EXECUTOR_CPUS = 0.25;
-
- /**
- * RAM required for the executor. Executors in the wild have been observed using 48-54MB RSS,
- * setting to 128MB to be extra vigilant initially.
- */
- @VisibleForTesting
- static final Amount<Long, Data> EXECUTOR_RAM = Amount.of(128L, Data.MB);
-
- private ResourceSlot(Resources r) {
- this.resources = r;
- }
-
- public static ResourceSlot from(ITaskConfig task) {
- double totalCPU = task.getNumCpus() + EXECUTOR_CPUS;
- Amount<Long, Data> totalRAM = Amount.of(task.getRamMb() + EXECUTOR_RAM.as(Data.MB), Data.MB);
- Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
- return new ResourceSlot(
- new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
- }
-
- public static ResourceSlot from(Offer offer) {
- return new ResourceSlot(Resources.from(offer));
- }
-
- public double getNumCpus() {
- return resources.getNumCpus();
- }
-
- public Amount<Long, Data> getRam() {
- return resources.getRam();
- }
-
- public Amount<Long, Data> getDisk() {
- return resources.getDisk();
- }
-
- public int getNumPorts() {
- return resources.getNumPorts();
- }
-
- @VisibleForTesting
- public static ResourceSlot from(double cpu,
- Amount<Long, Data> ram,
- Amount<Long, Data> disk,
- int ports) {
- double totalCPU = cpu + EXECUTOR_CPUS;
- Amount<Long, Data> totalRAM = Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.as(Data.MB), Data.MB);
-
- return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
- }
-
- public static ResourceSlot sum(ResourceSlot... rs) {
- return sum(Arrays.asList(rs));
- }
-
- public static ResourceSlot sum(Iterable<ResourceSlot> rs) {
- Resources r = Resources.sum(Iterables.transform(rs, new Function<ResourceSlot, Resources>() {
- @Override public Resources apply(ResourceSlot input) {
- return input.resources;
- }
- }));
-
- return new ResourceSlot(r);
- }
-
- public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
- @Override public int compare(ResourceSlot left, ResourceSlot right) {
- return Resources.RESOURCE_ORDER.compare(left.resources, right.resources);
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
deleted file mode 100644
index f90869d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.Atomics;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.StorageBackfill;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.StateMachine;
-import com.twitter.common.util.StateMachine.Transition;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.SingletonService.LeaderControl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
-
-/**
- * The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and
- * initialization through acting as a standby scheduler / log replica and finally to becoming the
- * scheduler leader.
- * <p>
- * The (enforced) call order to be used with this class is:
- * <ol>
- * <li>{@link #prepare()}, to initialize the storage system.</li>
- * <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
- * {@link LeadershipListener LeadershipListener}
- * returned from {@link #prepare()}, signaling that this process has exclusive control of the
- * cluster.</li>
- * <li>{@link #registered(DriverRegistered) registered()},
- * indicating that registration with the mesos master has succeeded.
- * At this point, the scheduler's presence will be announced via
- * {@link LeaderControl#advertise() advertise()}.</li>
- * </ol>
- * If this call order is broken, calls will fail by throwing
- * {@link java.lang.IllegalStateException}.
- * <p>
- * At any point in the lifecycle, the scheduler will respond to
- * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
- * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
- * A clean shutdown will also be initiated if control actions fail during normal state transitions.
- */
-public class SchedulerLifecycle implements EventSubscriber {
-
- private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
-
- private enum State {
- IDLE,
- PREPARING_STORAGE,
- STORAGE_PREPARED,
- LEADER_AWAITING_REGISTRATION,
- REGISTERED_LEADER,
- RUNNING,
- DEAD
- }
-
- private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
- @Override public boolean apply(Transition<State> state) {
- return state.getTo() == State.DEAD;
- }
- };
-
- private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
-
- private final LeadershipListener leadershipListener;
- private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
- private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
- private final StateMachine<State> stateMachine;
-
- @Inject
- SchedulerLifecycle(
- final DriverFactory driverFactory,
- final NonVolatileStorage storage,
- final Lifecycle lifecycle,
- final Driver driver,
- final DriverReference driverRef,
- final LeadingOptions leadingOptions,
- final ScheduledExecutorService executorService,
- final Clock clock) {
-
- this(
- driverFactory,
- storage,
- lifecycle,
- driver,
- driverRef,
- new DelayedActions() {
- @Override public void blockingDriverJoin(Runnable runnable) {
- executorService.execute(runnable);
- }
-
- @Override public void onAutoFailover(Runnable runnable) {
- executorService.schedule(
- runnable,
- leadingOptions.leadingTimeLimit.getValue(),
- leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
- }
-
- @Override public void onRegistrationTimeout(Runnable runnable) {
- LOG.info(
- "Giving up on registration in " + leadingOptions.registrationDelayLimit);
- executorService.schedule(
- runnable,
- leadingOptions.registrationDelayLimit.getValue(),
- leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
- }
- },
- clock);
- }
-
- @VisibleForTesting
- SchedulerLifecycle(
- final DriverFactory driverFactory,
- final NonVolatileStorage storage,
- final Lifecycle lifecycle,
- // TODO(wfarner): The presence of Driver and DriverReference is quite confusing. Figure out
- // a clean way to collapse the duties of DriverReference into DriverImpl.
- final Driver driver,
- final DriverReference driverRef,
- final DelayedActions delayedActions,
- final Clock clock) {
-
- Stats.export(new StatImpl<Integer>("framework_registered") {
- @Override public Integer read() {
- return registrationAcked.get() ? 1 : 0;
- }
- });
- for (final State state : State.values()) {
- Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
- @Override public Integer read() {
- return (state == stateMachine.getState()) ? 1 : 0;
- }
- });
- }
-
- final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- try {
- storage.prepare();
- stateMachine.transition(State.STORAGE_PREPARED);
- } catch (RuntimeException e) {
- stateMachine.transition(State.DEAD);
- throw e;
- }
- }
- };
-
- final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- LOG.info("Elected as leading scheduler!");
- storage.start(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- StorageBackfill.backfill(storeProvider, clock);
- }
- });
-
- @Nullable final String frameworkId = storage.consistentRead(
- new Work.Quiet<String>() {
- @Override public String apply(StoreProvider storeProvider) {
- return storeProvider.getSchedulerStore().fetchFrameworkId();
- }
- });
- driverRef.set(driverFactory.apply(frameworkId));
-
- delayedActions.onRegistrationTimeout(
- new Runnable() {
- @Override public void run() {
- if (!registrationAcked.get()) {
- LOG.severe(
- "Framework has not been registered within the tolerated delay.");
- stateMachine.transition(State.DEAD);
- }
- }
- });
-
- delayedActions.onAutoFailover(
- new Runnable() {
- @Override public void run() {
- LOG.info("Triggering automatic failover.");
- stateMachine.transition(State.DEAD);
- }
- });
-
- Protos.Status status = driver.start();
- LOG.info("Driver started with code " + status);
- delayedActions.blockingDriverJoin(new Runnable() {
- @Override public void run() {
- // Blocks until driver exits.
- driver.join();
- stateMachine.transition(State.DEAD);
- }
- });
- }
- };
-
- final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
- @Override public void execute(Transition<State> transition) {
- registrationAcked.set(true);
- try {
- leaderControl.get().advertise();
- } catch (JoinException e) {
- LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
- stateMachine.transition(State.DEAD);
- } catch (InterruptedException e) {
- LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
- stateMachine.transition(State.DEAD);
- Thread.currentThread().interrupt();
- }
- }
- };
-
- final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
- private final AtomicBoolean invoked = new AtomicBoolean(false);
- @Override public void execute(Transition<State> transition) {
- if (!invoked.compareAndSet(false, true)) {
- LOG.info("Shutdown already invoked, ignoring extra call.");
- return;
- }
-
- // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
- // resources here.
- try {
- LeaderControl control = leaderControl.get();
- if (control != null) {
- try {
- control.leave();
- } catch (JoinException e) {
- LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
- } catch (ServerSet.UpdateException e) {
- LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
- }
- }
-
- // TODO(wfarner): Re-evaluate tear-down ordering here. Should the top-level shutdown
- // be invoked first, or the underlying critical components?
- driver.stop();
- storage.stop();
- } finally {
- lifecycle.shutdown();
- }
- }
- };
-
- stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
- .initialState(State.IDLE)
- .logTransitions()
- .addState(
- Closures.filter(NOT_DEAD, prepareStorage),
- State.IDLE,
- State.PREPARING_STORAGE, State.DEAD)
- .addState(
- State.PREPARING_STORAGE,
- State.STORAGE_PREPARED, State.DEAD)
- .addState(
- Closures.filter(NOT_DEAD, handleLeading),
- State.STORAGE_PREPARED,
- State.LEADER_AWAITING_REGISTRATION, State.DEAD)
- .addState(
- Closures.filter(NOT_DEAD, handleRegistered),
- State.LEADER_AWAITING_REGISTRATION,
- State.REGISTERED_LEADER, State.DEAD)
- .addState(
- State.REGISTERED_LEADER,
- State.RUNNING, State.DEAD)
- .addState(
- State.RUNNING,
- State.DEAD)
- .addState(
- State.DEAD,
- // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
- State.DEAD
- )
- .onAnyTransition(
- Closures.filter(IS_DEAD, shutDown))
- .build();
-
- this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
- }
-
- /**
- * Prepares a scheduler to offer itself as a leader candidate. After this call the scheduler will
- * host a live log replica and start syncing data from the leader via the log until it gets called
- * upon to lead.
- *
- * @return A listener that can be offered for leadership of a distributed election.
- */
- public LeadershipListener prepare() {
- stateMachine.transition(State.PREPARING_STORAGE);
- return leadershipListener;
- }
-
- @Subscribe
- public void registered(DriverRegistered event) {
- stateMachine.transition(State.REGISTERED_LEADER);
- }
-
- /**
- * Maintains a reference to the driver.
- */
- static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
- private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
-
- @Override public Optional<SchedulerDriver> get() {
- return Optional.fromNullable(driver.get());
- }
-
- private void set(SchedulerDriver ref) {
- driver.set(ref);
- }
- }
-
- private static class SchedulerCandidateImpl implements LeadershipListener {
- private final StateMachine<State> stateMachine;
- private final AtomicReference<LeaderControl> leaderControl;
-
- SchedulerCandidateImpl(
- StateMachine<State> stateMachine,
- AtomicReference<LeaderControl> leaderControl) {
-
- this.stateMachine = stateMachine;
- this.leaderControl = leaderControl;
- }
-
- @Override public void onLeading(LeaderControl control) {
- leaderControl.set(control);
- stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
- }
-
- @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
- LOG.severe("Lost leadership, committing suicide.");
- stateMachine.transition(State.DEAD);
- }
- }
-
- public static class LeadingOptions {
- private final Amount<Long, Time> registrationDelayLimit;
- private final Amount<Long, Time> leadingTimeLimit;
-
- /**
- * Creates a new collection of options for tuning leadership behavior.
- *
- * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
- * complete.
- * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
- */
- public LeadingOptions(
- Amount<Long, Time> registrationDelayLimit,
- Amount<Long, Time> leadingTimeLimit) {
-
- Preconditions.checkArgument(
- registrationDelayLimit.getValue() >= 0,
- "Registration delay limit must be positive.");
- Preconditions.checkArgument(
- leadingTimeLimit.getValue() >= 0,
- "Leading time limit must be positive.");
-
- this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
- this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
- }
- }
-
- @VisibleForTesting
- interface DelayedActions {
- void blockingDriverJoin(Runnable runnable);
-
- void onAutoFailover(Runnable runnable);
-
- void onRegistrationTimeout(Runnable runnable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
deleted file mode 100644
index be4c2b1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Singleton;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.scheduler.Driver.DriverImpl;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
-import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Binding module for top-level scheduling logic.
- */
-public class SchedulerModule extends AbstractModule {
-
- @CmdLine(name = "executor_gc_interval",
- help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
- private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
- Arg.create(Amount.of(1L, Time.HOURS));
-
- @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
- private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
-
- @CmdLine(name = "max_registration_delay",
- help = "Max allowable delay to allow the driver to register before aborting")
- private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
- Arg.create(Amount.of(1L, Time.MINUTES));
-
- @CmdLine(name = "max_leading_duration",
- help = "After leading for this duration, the scheduler should commit suicide.")
- private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
- Arg.create(Amount.of(1L, Time.DAYS));
-
- @Override
- protected void configure() {
- bind(Driver.class).to(DriverImpl.class);
- bind(DriverImpl.class).in(Singleton.class);
- bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
- bind(DriverReference.class).in(Singleton.class);
-
- bind(Scheduler.class).to(MesosSchedulerImpl.class);
- bind(MesosSchedulerImpl.class).in(Singleton.class);
-
- bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
-
- bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
- EXECUTOR_GC_INTERVAL.get(),
- Optional.fromNullable(GC_EXECUTOR_PATH.get())));
-
- bind(GcExecutorLauncher.class).in(Singleton.class);
- bind(UserTaskLauncher.class).in(Singleton.class);
-
- install(new PrivateModule() {
- @Override protected void configure() {
- bind(LeadingOptions.class).toInstance(
- new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
- final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
- 1,
- new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
- bind(ScheduledExecutorService.class).toInstance(executor);
- bind(SchedulerLifecycle.class).in(Singleton.class);
- expose(SchedulerLifecycle.class);
- }
- });
-
- PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
- PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
- }
-
- @Provides
- @Singleton
- List<TaskLauncher> provideTaskLaunchers(
- GcExecutorLauncher gcLauncher,
- UserTaskLauncher userTaskLauncher) {
-
- return ImmutableList.of(gcLauncher, userTaskLauncher);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java b/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
deleted file mode 100644
index 240649e..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.UUID;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.util.Clock;
-
-/**
- * A function that generates universally-unique (not guaranteed, but highly confident) task IDs.
- */
-public interface TaskIdGenerator {
-
- /**
- * Generates a universally-unique ID for the task. This is not necessarily a repeatable
- * operation, two subsequent invocations with the same object need not return the same value.
- *
- * @param task Configuration of the task to create an ID for.
- * @param instanceId Instance ID for the task.
- * @return A universally-unique ID for the task.
- */
- String generate(ITaskConfig task, int instanceId);
-
- class TaskIdGeneratorImpl implements TaskIdGenerator {
- private final Clock clock;
-
- @Inject
- TaskIdGeneratorImpl(Clock clock) {
- this.clock = Preconditions.checkNotNull(clock);
- }
-
- @Override
- public String generate(ITaskConfig task, int instanceId) {
- String sep = "-";
- return new StringBuilder()
- .append(clock.nowMillis()) // Allows chronological sorting.
- .append(sep)
- .append(task.getOwner().getRole()) // Identification and collision prevention.
- .append(sep)
- .append(task.getEnvironment())
- .append(sep)
- .append(task.getJobName())
- .append(sep)
- .append(instanceId) // Collision prevention within job.
- .append(sep)
- .append(UUID.randomUUID()) // Just-in-case collision prevention.
- .toString().replaceAll("[^\\w-]", sep); // Constrain character set.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java b/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
deleted file mode 100644
index aade6da..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-
-/**
- * A receiver of resource offers and task status updates.
- */
-public interface TaskLauncher {
-
- /**
- * Grants a resource offer to the task launcher, which will be passed to any subsequent task
- * launchers if this one does not accept.
- * <p>
- * A task launcher may choose to retain an offer for later use. Any retained offers must be
- * cleaned up with {@link #cancelOffer(OfferID)}.
- *
- * @param offer The resource offer.
- * @return A task, absent if the launcher chooses not to accept the offer.
- */
- Optional<TaskInfo> createTask(Offer offer);
-
- /**
- * Informs the launcher that a status update has been received for a task. If the task is not
- * associated with the launcher, it should return {@code false} so that another launcher may
- * receive it.
- *
- * @param status The status update.
- * @return {@code true} if the status is relevant to the launcher and should not be delivered to
- * other launchers, {@code false} otherwise.
- */
- boolean statusUpdate(TaskStatus status);
-
- /**
- * Informs the launcher that a previously-advertised offer is canceled and may not be used.
- *
- * @param offer The canceled offer.
- */
- void cancelOffer(OfferID offer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskVars.java b/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
deleted file mode 100644
index 5574631..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.stats.StatsProvider;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A container that tracks and exports stat counters for tasks.
- */
-class TaskVars implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
-
- // Used to ignore pubsub events sent before storage has completely started. This avoids a
- // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
- // and pubsub events are fired for tasks that we have not yet counted. For example, if
- // tasksDeleted is invoked, we would end up with a negative count.
- private volatile boolean storageStarted = false;
-
- private final LoadingCache<String, AtomicLong> countersByStatus;
- private final LoadingCache<String, AtomicLong> countersByRack;
-
- private final Storage storage;
-
- @Inject
- TaskVars(Storage storage, final StatsProvider statProvider) {
- this.storage = checkNotNull(storage);
- checkNotNull(statProvider);
- countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override public AtomicLong load(String statName) {
- return statProvider.makeCounter(statName);
- }
- });
- countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override public AtomicLong load(String rack) {
- return statProvider.makeCounter(rackStatName(rack));
- }
- });
- }
-
- @VisibleForTesting
- static String getVarName(ScheduleStatus status) {
- return "task_store_" + status;
- }
-
- @VisibleForTesting
- static String rackStatName(String rack) {
- return "tasks_lost_rack_" + rack;
- }
-
- private static final Predicate<Attribute> IS_RACK = new Predicate<Attribute>() {
- @Override public boolean apply(Attribute attr) {
- return "rack".equals(attr.getName());
- }
- };
-
- private static final Function<Attribute, String> ATTR_VALUE = new Function<Attribute, String>() {
- @Override public String apply(Attribute attr) {
- return Iterables.getOnlyElement(attr.getValues());
- }
- };
-
- private AtomicLong getCounter(ScheduleStatus status) {
- return countersByStatus.getUnchecked(getVarName(status));
- }
-
- private void incrementCount(ScheduleStatus status) {
- getCounter(status).incrementAndGet();
- }
-
- private void decrementCount(ScheduleStatus status) {
- getCounter(status).decrementAndGet();
- }
-
- @Subscribe
- public void taskChangedState(TaskStateChange stateChange) {
- if (!storageStarted) {
- return;
- }
-
- IScheduledTask task = stateChange.getTask();
- if (stateChange.getOldState() != ScheduleStatus.INIT) {
- decrementCount(stateChange.getOldState());
- }
- incrementCount(task.getStatus());
-
- if (stateChange.getNewState() == ScheduleStatus.LOST) {
- final String host = stateChange.getTask().getAssignedTask().getSlaveHost();
- Optional<String> rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
- @Override public Optional<String> apply(StoreProvider storeProvider) {
- Optional<Attribute> rack = FluentIterable
- .from(AttributeStore.Util.attributesOrNone(storeProvider, host))
- .firstMatch(IS_RACK);
- return rack.transform(ATTR_VALUE);
- }
- });
-
- if (rack.isPresent()) {
- countersByRack.getUnchecked(rack.get()).incrementAndGet();
- } else {
- LOG.warning("Failed to find rack attribute associated with host " + host);
- }
- }
- }
-
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
- incrementCount(task.getStatus());
- }
-
- // Dummy read the counter for each status counter. This is important to guarantee a stat with
- // value zero is present for each state, even if all states are not represented in the task
- // store.
- for (ScheduleStatus status : ScheduleStatus.values()) {
- getCounter(status);
- }
- storageStarted = true;
- }
-
- @Subscribe
- public void tasksDeleted(final TasksDeleted event) {
- if (!storageStarted) {
- return;
- }
-
- for (IScheduledTask task : event.getTasks()) {
- decrementCount(task.getStatus());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java b/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
deleted file mode 100644
index 0fb3bbb..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.async.OfferQueue;
-import com.twitter.aurora.scheduler.base.Conversions;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.state.StateManager;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A task launcher that matches resource offers against user tasks.
- */
-class UserTaskLauncher implements TaskLauncher {
-
- private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
-
- @VisibleForTesting
- static final String MEMORY_LIMIT_EXCEEDED = "MEMORY STATISTICS";
-
- @VisibleForTesting
- static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
-
- private final OfferQueue offerQueue;
- private final StateManager stateManager;
-
- @Inject
- UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
- this.offerQueue = checkNotNull(offerQueue);
- this.stateManager = checkNotNull(stateManager);
- }
-
- @Override
- public Optional<TaskInfo> createTask(Offer offer) {
- checkNotNull(offer);
-
- offerQueue.addOffer(offer);
- return Optional.absent();
- }
-
- @Override
- public synchronized boolean statusUpdate(TaskStatus status) {
- @Nullable String message = null;
- if (status.hasMessage()) {
- message = status.getMessage();
- }
-
- try {
- ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
- // TODO(William Farner): Remove this hack once Mesos API change is done.
- // Tracked by: https://issues.apache.org/jira/browse/MESOS-343
- if ((translatedState == ScheduleStatus.FAILED)
- && (message != null)
- && (message.contains(MEMORY_LIMIT_EXCEEDED))) {
- message = MEMORY_LIMIT_DISPLAY;
- }
-
- stateManager.changeState(
- Query.taskScoped(status.getTaskId().getValue()),
- translatedState,
- Optional.fromNullable(message));
- } catch (SchedulerException e) {
- LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
- throw e;
- }
- return true;
- }
-
- @Override
- public void cancelOffer(OfferID offer) {
- offerQueue.cancelOffer(offer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java b/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
deleted file mode 100644
index 24702b0..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
-import com.google.inject.Provides;
-
-import org.apache.mesos.Scheduler;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.aurora.GuiceUtils;
-import com.twitter.aurora.scheduler.SchedulerModule;
-import com.twitter.aurora.scheduler.async.AsyncModule;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl;
-import com.twitter.aurora.scheduler.http.ClusterName;
-import com.twitter.aurora.scheduler.http.ServletModule;
-import com.twitter.aurora.scheduler.metadata.MetadataModule;
-import com.twitter.aurora.scheduler.quota.QuotaModule;
-import com.twitter.aurora.scheduler.state.StateModule;
-import com.twitter.aurora.scheduler.stats.AsyncStatsModule;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.Command;
-import com.twitter.common.inject.TimedInterceptor;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.ServerSetImpl;
-import com.twitter.common.zookeeper.SingletonService;
-import com.twitter.common.zookeeper.ZooKeeperClient;
-import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
-import com.twitter.common.zookeeper.ZooKeeperUtils;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
-import com.twitter.thrift.ServiceInstance;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Binding module for the aurora scheduler application.
- */
-class AppModule extends AbstractModule {
- private static final Logger LOG = Logger.getLogger(AppModule.class.getName());
-
- private final String clusterName;
- private final String serverSetPath;
- private final ClientConfig zkClientConfig;
-
- AppModule(String clusterName, String serverSetPath, ClientConfig zkClientConfig) {
- this.clusterName = checkNotBlank(clusterName);
- this.serverSetPath = checkNotBlank(serverSetPath);
- this.zkClientConfig = checkNotNull(zkClientConfig);
- }
-
- @Override
- protected void configure() {
- // Enable intercepted method timings and context classloader repair.
- TimedInterceptor.bind(binder());
- GuiceUtils.bindJNIContextClassLoader(binder(), Scheduler.class);
- GuiceUtils.bindExceptionTrap(binder(), Scheduler.class);
-
- bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-
- bind(Key.get(String.class, ClusterName.class)).toInstance(clusterName);
-
- // Filter layering: notifier filter -> base impl
- PubsubEventModule.bind(binder(), SchedulingFilterImpl.class);
- bind(SchedulingFilterImpl.class).in(Singleton.class);
-
- LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);
-
- install(new AsyncModule());
- install(new AsyncStatsModule());
- install(new MetadataModule());
- install(new QuotaModule());
- install(new ServletModule());
- install(new SchedulerModule());
- install(new StateModule());
-
- bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
- }
-
- /**
- * Command to register a thread stack printer that identifies initiator of a shutdown.
- */
- private static class RegisterShutdownStackPrinter implements Command {
- private static final Function<StackTraceElement, String> STACK_ELEM_TOSTRING =
- new Function<StackTraceElement, String>() {
- @Override public String apply(StackTraceElement element) {
- return element.getClassName() + "." + element.getMethodName()
- + String.format("(%s:%s)", element.getFileName(), element.getLineNumber());
- }
- };
-
- private final ShutdownRegistry shutdownRegistry;
-
- @Inject
- RegisterShutdownStackPrinter(ShutdownRegistry shutdownRegistry) {
- this.shutdownRegistry = shutdownRegistry;
- }
-
- @Override
- public void execute() {
- shutdownRegistry.addAction(new Command() {
- @Override public void execute() {
- Thread thread = Thread.currentThread();
- String message = new StringBuilder()
- .append("Thread: ").append(thread.getName())
- .append(" (id ").append(thread.getId()).append(")")
- .append("\n")
- .append(Joiner.on("\n ").join(
- Iterables.transform(Arrays.asList(thread.getStackTrace()), STACK_ELEM_TOSTRING)))
- .toString();
-
- LOG.info("Shutdown initiated by: " + message);
- }
- });
- }
- }
-
- @Provides
- @Singleton
- List<ACL> provideAcls() {
- if (zkClientConfig.credentials == Credentials.NONE) {
- LOG.warning("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled.");
- return ZooKeeperUtils.OPEN_ACL_UNSAFE;
- } else {
- return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL;
- }
- }
-
- @Provides
- @Singleton
- ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
- return new ServerSetImpl(client, zooKeeperAcls, serverSetPath);
- }
-
- @Provides
- @Singleton
- DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) {
- // Used for a type re-binding of the serverset.
- return serverSet;
- }
-
- @Provides
- @Singleton
- SingletonService provideSingletonService(
- ZooKeeperClient client,
- ServerSet serverSet,
- List<ACL> zookeeperAcls) {
-
- return new SingletonService(
- serverSet,
- SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java b/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
deleted file mode 100644
index 0ea2204..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.twitter.common.logging.RootLogConfig;
-import com.twitter.common.logging.RootLogConfig.Configuration;
-import com.twitter.common.logging.log4j.GlogLayout;
-
-/**
- * Configures log4j logging.
- */
-final class Log4jConfigurator {
- private static final java.util.logging.Logger LOG =
- java.util.logging.Logger.getLogger(Log4jConfigurator.class.getName());
-
- /**
- * Configures log4j to log to stderr with a glog format.
- *
- * @param glogConfig The glog configuration in effect.
- */
- static void configureConsole(Configuration glogConfig) {
- Preconditions.checkNotNull(glogConfig);
-
- BasicConfigurator.configure(
- new ConsoleAppender(new GlogLayout(), ConsoleAppender.SYSTEM_ERR));
- Logger.getRootLogger().setLevel(getLevel(glogConfig));
- }
-
- private static Level getLevel(RootLogConfig.Configuration logConfig) {
- switch (logConfig.getVlog()) {
- case FINEST: // fall through
- case FINER: // fall through
- case FINE: // fall through
- case CONFIG:
- return Level.TRACE;
- case INFO:
- return Level.INFO;
- case WARNING:
- return Level.WARN;
- case SEVERE:
- return Level.ERROR;
- default:
- LOG.warning("Mapping unexpected vlog value of " + logConfig.getVlog() + " to log4j TRACE");
- return Level.TRACE;
- }
- }
-
- private Log4jConfigurator() {
- // Utility class.
- }
-}