You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:32 UTC

[39/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
deleted file mode 100644
index 4afc263..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matcher;
-import com.google.inject.matcher.Matchers;
-
-import org.aopalliance.intercept.MethodInterceptor;
-
-import com.twitter.aurora.GuiceUtils;
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.AuroraSchedulerManager;
-import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-
-/**
- * Binding module for AOP-style decorations of the thrift API.
- */
-public class AopModule extends AbstractModule {
-
-  @CmdLine(name = "enable_job_updates", help = "Whether new job updates should be accepted.")
-  private static final Arg<Boolean> ENABLE_UPDATES = Arg.create(true);
-
-  @CmdLine(name = "enable_job_creation",
-      help = "Allow new jobs to be created, if false all job creation requests will be denied.")
-  private static final Arg<Boolean> ENABLE_JOB_CREATION = Arg.create(true);
-
-  private static final Matcher<? super Class<?>> THRIFT_IFACE_MATCHER =
-      Matchers.subclassesOf(AuroraAdmin.Iface.class)
-          .and(Matchers.annotatedWith(DecoratedThrift.class));
-
-  private final Map<String, Boolean> toggledMethods;
-
-  public AopModule() {
-    this(ImmutableMap.of(
-        "createJob", ENABLE_JOB_CREATION.get(),
-        "acquireLock", ENABLE_UPDATES.get()));
-  }
-
-  @VisibleForTesting
-  AopModule(Map<String, Boolean> toggledMethods) {
-    this.toggledMethods = ImmutableMap.copyOf(toggledMethods);
-  }
-
-  private static final Function<Method, String> GET_NAME = new Function<Method, String>() {
-    @Override public String apply(Method method) {
-      return method.getName();
-    }
-  };
-
-  @Override
-  protected void configure() {
-    requireBinding(CapabilityValidator.class);
-
-    // Layer ordering:
-    // Log -> CapabilityValidator -> FeatureToggle -> StatsExporter -> APIVersion ->
-    // SchedulerThriftInterface
-
-    // TODO(Sathya): Consider using provider pattern for constructing interceptors to facilitate
-    // unit testing without the creation of Guice injectors.
-    bindThriftDecorator(new LoggingInterceptor());
-
-    // Note: it's important that the capability interceptor is only applied to AuroraAdmin.Iface
-    // methods, and does not pick up methods on AuroraSchedulerManager.Iface.
-    MethodInterceptor authInterceptor = new UserCapabilityInterceptor();
-    requestInjection(authInterceptor);
-    bindInterceptor(
-        THRIFT_IFACE_MATCHER,
-        GuiceUtils.interfaceMatcher(AuroraAdmin.Iface.class, true),
-        authInterceptor);
-
-    install(new PrivateModule() {
-      @Override protected void configure() {
-        // Ensure that the provided methods exist on the decorated interface.
-        List<Method> methods =
-            ImmutableList.copyOf(AuroraSchedulerManager.Iface.class.getMethods());
-        for (String toggledMethod : toggledMethods.keySet()) {
-          Preconditions.checkArgument(
-              Iterables.any(methods,
-                  Predicates.compose(Predicates.equalTo(toggledMethod), GET_NAME)),
-              String.format("Method %s was not found in class %s",
-                  toggledMethod,
-                  AuroraSchedulerManager.Iface.class));
-        }
-
-        bind(new TypeLiteral<Map<String, Boolean>>() { }).toInstance(toggledMethods);
-        bind(IsFeatureEnabled.class).in(Singleton.class);
-        Key<Predicate<Method>> predicateKey = Key.get(new TypeLiteral<Predicate<Method>>() { });
-        bind(predicateKey).to(IsFeatureEnabled.class);
-        expose(predicateKey);
-      }
-    });
-    bindThriftDecorator(new FeatureToggleInterceptor());
-    bindThriftDecorator(new ThriftStatsExporterInterceptor());
-    bindThriftDecorator(new APIVersionInterceptor());
-  }
-
-  private void bindThriftDecorator(MethodInterceptor interceptor) {
-    bindThriftDecorator(binder(), THRIFT_IFACE_MATCHER, interceptor);
-  }
-
-  @VisibleForTesting
-  static void bindThriftDecorator(
-      Binder binder,
-      Matcher<? super Class<?>> classMatcher,
-      MethodInterceptor interceptor) {
-
-    binder.bindInterceptor(classMatcher, Matchers.any(), interceptor);
-    binder.requestInjection(interceptor);
-  }
-
-  private static class IsFeatureEnabled implements Predicate<Method> {
-    private final Predicate<String> methodEnabled;
-
-    @Inject
-    IsFeatureEnabled(Map<String, Boolean> toggleMethods) {
-      Predicate<String> builder = Predicates.alwaysTrue();
-      for (Map.Entry<String, Boolean> toggleMethod : toggleMethods.entrySet()) {
-        Predicate<String> enableMethod = Predicates.or(
-            toggleMethod.getValue()
-                ? Predicates.<String>alwaysTrue()
-                : Predicates.<String>alwaysFalse(),
-            Predicates.not(Predicates.equalTo(toggleMethod.getKey())));
-        builder = Predicates.and(builder, enableMethod);
-      }
-      methodEnabled = builder;
-    }
-
-    @Override
-    public boolean apply(Method method) {
-      return methodEnabled.apply(method.getName());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
deleted file mode 100644
index 03c3d99..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Predicate;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.gen.ResponseCode;
-
-/**
- * A method interceptor that blocks access to features based on a supplied predicate.
- */
-public class FeatureToggleInterceptor implements MethodInterceptor {
-
-  @Inject private Predicate<Method> allowMethod;
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    Method method = invocation.getMethod();
-    if (!allowMethod.apply(method)) {
-      return Interceptors.properlyTypedResponse(
-          method,
-          ResponseCode.ERROR,
-          "The " + method.getName() + " feature is currently disabled on this scheduler.");
-    } else {
-      return invocation.proceed();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
deleted file mode 100644
index d0cb9c1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
-import com.google.common.base.Throwables;
-
-import com.twitter.aurora.gen.ResponseCode;
-
-/**
- * Utility class for functions useful when implementing an interceptor on the thrift interface.
- */
-final class Interceptors {
-
-  private Interceptors() {
-    // Utility class.
-  }
-
-  private static final Logger LOG = Logger.getLogger(Interceptors.class.getName());
-
-  static Object properlyTypedResponse(Method method, ResponseCode responseCode, String message)
-      throws IllegalAccessException, InstantiationException {
-
-    Class<?> returnType = method.getReturnType();
-    Object response = returnType.newInstance();
-    invoke(returnType, response, "setResponseCode", ResponseCode.class, responseCode);
-    invoke(returnType, response, "setMessage", String.class, message);
-    return response;
-  }
-
-  private static <T> void invoke(
-      Class<?> type,
-      Object obj,
-      String name,
-      Class<T> parameterType,
-      T argument) {
-
-    Method method;
-    try {
-      method = type.getMethod(name, parameterType);
-    } catch (NoSuchMethodException e) {
-      LOG.severe(type + " does not support " + name);
-      throw Throwables.propagate(e);
-    }
-    try {
-      method.invoke(obj, argument);
-    } catch (IllegalAccessException e) {
-      LOG.severe("Method " + name + " is not accessible in " + type);
-      throw Throwables.propagate(e);
-    } catch (InvocationTargetException e) {
-      LOG.severe("Failed to invoke " + name + " on " + type);
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
deleted file mode 100644
index 5f773cc..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.gen.ExecutorConfig;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.SessionKey;
-
-import static com.twitter.aurora.scheduler.thrift.aop.Interceptors.properlyTypedResponse;
-
-/**
- * A method interceptor that logs all invocations as well as any unchecked exceptions thrown from
- * the underlying call.
- */
-class LoggingInterceptor implements MethodInterceptor {
-
-  private static final Logger LOG = Logger.getLogger(LoggingInterceptor.class.getName());
-
-  @Inject private CapabilityValidator validator;
-
-  // TODO(wfarner): Scrub updateToken when it is identifiable by type.
-  private final Map<Class<?>, Function<Object, String>> printFunctions =
-      ImmutableMap.<Class<?>, Function<Object, String>>of(
-          JobConfiguration.class,
-          new Function<Object, String>() {
-            @Override public String apply(Object input) {
-              JobConfiguration configuration = ((JobConfiguration) input).deepCopy();
-              if (configuration.isSetTaskConfig()) {
-                configuration.getTaskConfig().setExecutorConfig(
-                    new ExecutorConfig("BLANKED", "BLANKED"));
-              }
-              return configuration.toString();
-            }
-          },
-          SessionKey.class,
-          new Function<Object, String>() {
-            @Override public String apply(Object input) {
-              SessionKey key = (SessionKey) input;
-              return validator.toString(key);
-            }
-          }
-      );
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    List<String> argStrings = Lists.newArrayList();
-    for (Object arg : invocation.getArguments()) {
-      if (arg == null) {
-        argStrings.add("null");
-      } else {
-        Function<Object, String> printFunction = printFunctions.get(arg.getClass());
-        argStrings.add((printFunction == null) ? arg.toString() : printFunction.apply(arg));
-      }
-    }
-    String methodName = invocation.getMethod().getName();
-    String message = String.format("%s(%s)", methodName, Joiner.on(", ").join(argStrings));
-    LOG.info(message);
-    try {
-      return invocation.proceed();
-    } catch (RuntimeException e) {
-      LOG.log(Level.WARNING, "Uncaught exception while handling " + message, e);
-      return properlyTypedResponse(invocation.getMethod(), ResponseCode.ERROR, e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
deleted file mode 100644
index d700ab5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
-
-/**
- * A method interceptor that exports counterStats about thrift calls.
- */
-class ThriftStatsExporterInterceptor implements MethodInterceptor {
-
-  private final LoadingCache<Method, SlidingStats> stats =
-      CacheBuilder.newBuilder().build(new CacheLoader<Method, SlidingStats>() {
-        @Override public SlidingStats load(Method method) {
-          return new SlidingStats(
-              Stats.normalizeName(String.format("scheduler_thrift_%s", method.getName())),
-              "nanos");
-        }
-      });
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    SlidingStats stat = stats.get(invocation.getMethod());
-    long start = System.nanoTime();
-    try {
-      return invocation.proceed();
-    } finally {
-      stat.accumulate(System.nanoTime() - start);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
deleted file mode 100644
index d9240bc..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.SessionKey;
-import com.twitter.aurora.scheduler.thrift.auth.Requires;
-
-/**
- * A method interceptor that will authenticate users identified by a {@link SessionKey} argument
- * to invoked methods.
- * <p>
- * Intercepted methods will require {@link Capability#ROOT}, but additional capabilities
- * may be specified by annotating methods with {@link Requires} and supplying a whitelist.
- */
-class UserCapabilityInterceptor implements MethodInterceptor {
-  private static final Logger LOG = Logger.getLogger(UserCapabilityInterceptor.class.getName());
-
-  @Inject private CapabilityValidator capabilityValidator;
-
-  private static final Function<Object, SessionKey> CAST = new Function<Object, SessionKey>() {
-    @Override public SessionKey apply(Object o) {
-      return (SessionKey) o;
-    }
-  };
-
-  @Override
-  public Object invoke(MethodInvocation invocation) throws Throwable {
-    Preconditions.checkNotNull(capabilityValidator, "Session validator has not yet been set.");
-
-    // Ensure ROOT is always permitted.
-    ImmutableList.Builder<Capability> whitelistBuilder =
-        ImmutableList.<Capability>builder().add(Capability.ROOT);
-
-    Method method = invocation.getMethod();
-    Requires requires = method.getAnnotation(Requires.class);
-    if (requires != null) {
-      whitelistBuilder.add(requires.whitelist());
-    }
-
-    List<Capability> whitelist = whitelistBuilder.build();
-    LOG.fine("Operation " + method.getName() + " may be performed by: " + whitelist);
-    Optional<SessionKey> sessionKey = FluentIterable.from(Arrays.asList(invocation.getArguments()))
-        .firstMatch(Predicates.instanceOf(SessionKey.class)).transform(CAST);
-    if (!sessionKey.isPresent()) {
-      LOG.severe("Interceptor should only be applied to methods accepting a SessionKey, but "
-          + method + " does not.");
-      return invocation.proceed();
-    }
-
-    String key = capabilityValidator.toString(sessionKey.get());
-    for (Capability user : whitelist) {
-      LOG.fine("Attempting to validate " + key + " against " + user);
-      try {
-        capabilityValidator.checkAuthorized(sessionKey.get(), user, AuditCheck.NONE);
-
-        LOG.info("Permitting " + key + " to act as "
-            + user + " and perform action " + method.getName());
-        return invocation.proceed();
-      } catch (AuthFailedException e) {
-        LOG.fine("Auth failed: " + e);
-      }
-    }
-
-    // User is not permitted to perform this operation.
-    return Interceptors.properlyTypedResponse(
-        method,
-        ResponseCode.AUTH_FAILED,
-        "Session identified by '" + key
-            + "' does not have the required capability to perform this action: " + whitelist);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
deleted file mode 100644
index 4a667c5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.ElementType.TYPE;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Type annotation to apply to a thrift interface implementation that should be decorated with
- * additional functionality.
- */
-@Target({PARAMETER, TYPE}) @Retention(RUNTIME)
-public @interface DecoratedThrift {
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
deleted file mode 100644
index 0fff3f6..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Annotation applied to a method that may allow users with non-ROOT capabilities to perform
- * an action.
- */
-@Target(METHOD) @Retention(RUNTIME)
-public @interface Requires {
-  /**
-   * The list of capabilities required to perform an action.
-   */
-  Capability[] whitelist() default { Capability.ROOT };
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
deleted file mode 100644
index 66f9033..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.AbstractModule;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotEmpty;
-
-/**
- * Binding module for authentication of users with special capabilities for admin functions.
- */
-public class ThriftAuthModule extends AbstractModule {
-
-  private static final Map<Capability, String> DEFAULT_CAPABILITIES =
-      ImmutableMap.of(Capability.ROOT, "mesos");
-
-  @NotEmpty
-  @CmdLine(name = "user_capabilities",
-      help = "Concrete name mappings for administration capabilities.")
-  private static final Arg<Map<Capability, String>> USER_CAPABILITIES =
-      Arg.create(DEFAULT_CAPABILITIES);
-
-  private Map<Capability, String> capabilities;
-
-  public ThriftAuthModule() {
-    this(USER_CAPABILITIES.get());
-  }
-
-  @VisibleForTesting
-  public ThriftAuthModule(Map<Capability, String> capabilities) {
-    this.capabilities = Preconditions.checkNotNull(capabilities);
-  }
-
-  @Override
-  protected void configure() {
-    Preconditions.checkArgument(
-        capabilities.containsKey(Capability.ROOT),
-        "A ROOT capability must be provided with --user_capabilities");
-    bind(new TypeLiteral<Map<Capability, String>>() { }).toInstance(capabilities);
-
-    requireBinding(SessionValidator.class);
-    requireBinding(CapabilityValidator.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/GuiceUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuiceUtils.java b/src/main/java/org/apache/aurora/GuiceUtils.java
new file mode 100644
index 0000000..207535d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/GuiceUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.lang.reflect.Method;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.inject.Binder;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.matcher.AbstractMatcher;
+import com.google.inject.matcher.Matcher;
+import com.google.inject.matcher.Matchers;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.common.collections.Pair;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Utilities for guice configuration in aurora.
+ */
+public final class GuiceUtils {
+
+  private static final Logger LOG = Logger.getLogger(GuiceUtils.class.getName());
+
+  // Method annotation that allows a trapped interface to whitelist methods that may throw
+  // unchecked exceptions.
+  @BindingAnnotation
+  @Target(METHOD) @Retention(RUNTIME)
+  public @interface AllowUnchecked { }
+
+  private GuiceUtils() {
+    // utility
+  }
+
+  // No wildcards on the Class here because it upsets checkstyle - complains with:
+  // '>' is followed by whitespace.
+  private static final Function<Method, Pair<String, Class[]>> CANONICALIZE =
+      new Function<Method, Pair<String, Class[]>>() {
+        @Override public Pair<String, Class[]> apply(Method method) {
+          return Pair.of(method.getName(), (Class[]) method.getParameterTypes());
+        }
+      };
+
+  /**
+   * Creates a matcher that will match methods of an interface, optionally excluding inherited
+   * methods.
+   *
+   * @param matchInterface The interface to match.
+   * @param declaredMethodsOnly if {@code true} only methods directly declared in the interface
+   *                            will be matched, otherwise all methods on the interface are matched.
+   * @return A new matcher instance.
+   */
+  public static Matcher<Method> interfaceMatcher(
+      Class<?> matchInterface,
+      boolean declaredMethodsOnly) {
+
+    Method[] methods =
+        declaredMethodsOnly ? matchInterface.getDeclaredMethods() : matchInterface.getMethods();
+    final Set<Pair<String, Class[]>> interfaceMethods =
+        ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(methods), CANONICALIZE));
+    final LoadingCache<Method, Pair<String, Class[]>> cache = CacheBuilder.newBuilder()
+        .build(CacheLoader.from(CANONICALIZE));
+
+    return new AbstractMatcher<Method>() {
+      @Override public boolean matches(Method method) {
+        return interfaceMethods.contains(cache.getUnchecked(method));
+      }
+    };
+  }
+
+  /**
+   * Binds an interceptor that ensures the main ClassLoader is bound as the thread context
+   * {@link ClassLoader} during JNI callbacks from mesos.  Some libraries require a thread
+   * context ClassLoader be set and this ensures those libraries work properly.
+   *
+   * @param binder The binder to use to register an interceptor with.
+   * @param wrapInterface Interface whose methods should wrapped.
+   */
+  public static void bindJNIContextClassLoader(Binder binder, Class<?> wrapInterface) {
+    final ClassLoader mainClassLoader = GuiceUtils.class.getClassLoader();
+    binder.bindInterceptor(
+        Matchers.subclassesOf(wrapInterface),
+        interfaceMatcher(wrapInterface, false),
+        new MethodInterceptor() {
+          @Override public Object invoke(MethodInvocation invocation) throws Throwable {
+            Thread currentThread = Thread.currentThread();
+            ClassLoader prior = currentThread.getContextClassLoader();
+            try {
+              currentThread.setContextClassLoader(mainClassLoader);
+              return invocation.proceed();
+            } finally {
+              currentThread.setContextClassLoader(prior);
+            }
+          }
+        });
+  }
+
+  private static final Predicate<Method> IS_WHITELISTED = new Predicate<Method>() {
+    @Override public boolean apply(Method method) {
+      return method.getAnnotation(AllowUnchecked.class) != null;
+    }
+  };
+
+  private static final Matcher<Method> WHITELIST_MATCHER = new AbstractMatcher<Method>() {
+    @Override public boolean matches(Method method) {
+      return IS_WHITELISTED.apply(method);
+    }
+  };
+
+  private static final Predicate<Method> VOID_METHOD = new Predicate<Method>() {
+    @Override public boolean apply(Method method) {
+      return method.getReturnType() == Void.TYPE;
+    }
+  };
+
+  /**
+   * Binds an exception trap on all interface methods of all classes bound against an interface.
+   * Individual methods may opt out of trapping by annotating with {@link AllowUnchecked}.
+   * Only void methods are allowed, any non-void interface methods must explicitly opt out.
+   *
+   * @param binder The binder to register an interceptor with.
+   * @param wrapInterface Interface whose methods should be wrapped.
+   * @throws IllegalArgumentException If any of the non-whitelisted interface methods are non-void.
+   */
+  public static void bindExceptionTrap(Binder binder, Class<?> wrapInterface)
+      throws IllegalArgumentException {
+
+    Set<Method> disallowed = ImmutableSet.copyOf(Iterables.filter(
+        ImmutableList.copyOf(wrapInterface.getMethods()),
+        Predicates.and(Predicates.not(IS_WHITELISTED), Predicates.not(VOID_METHOD))));
+    Preconditions.checkArgument(disallowed.isEmpty(),
+        "Non-void methods must be explicitly whitelisted with @AllowUnchecked: " + disallowed);
+
+    Matcher<Method> matcher =
+        Matchers.<Method>not(WHITELIST_MATCHER).and(interfaceMatcher(wrapInterface, false));
+    binder.bindInterceptor(Matchers.subclassesOf(wrapInterface), matcher,
+        new MethodInterceptor() {
+          @Override public Object invoke(MethodInvocation invocation) throws Throwable {
+            try {
+              return invocation.proceed();
+            } catch (RuntimeException e) {
+              LOG.log(Level.WARNING, "Trapped uncaught exception: " + e, e);
+              return null;
+            }
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/Protobufs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/Protobufs.java b/src/main/java/org/apache/aurora/Protobufs.java
new file mode 100644
index 0000000..c3ddea4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/Protobufs.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora;
+
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Utility functions that are useful for working with protocol buffer messages.
+ */
+public final class Protobufs {
+
+  private Protobufs() {
+    // Utility class.
+  }
+
+  /**
+   * Function to call {@link #toString(Message)} on message objects.
+   */
+  public static final Function<Message, String> SHORT_TOSTRING = new Function<Message, String>() {
+    @Override public String apply(Message message) {
+      return Protobufs.toString(message);
+    }
+  };
+
+  /**
+   * Alternative to the default protobuf toString implementation, which omits newlines.
+   *
+   * @param message Message to print.
+   * @return String representation of the message.
+   */
+  public static String toString(Message message) {
+    return TextFormat.shortDebugString(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/CapabilityValidator.java b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
new file mode 100644
index 0000000..05958f0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * A session validator that supports user capability matching.
+ * <p>
+ * This supports asking whether a user has been granted a specific administration capability.
+ */
+public interface CapabilityValidator extends SessionValidator {
+
+  enum Capability {
+    ROOT,
+    PROVISIONER
+  }
+
+  /**
+   * Audit check to perform for a given action.
+   */
+  enum AuditCheck {
+    /**
+     * Default. No audit checks will be performed.
+     */
+    NONE,
+
+    /**
+     * A check will be performed to verify if a given action has
+     * all necessary data to generate a valid audit trail.
+     */
+    REQUIRED
+  }
+
+  /**
+   * Checks whether a session key is authenticated, and has the specified capability.
+   *
+   * @param sessionKey Key to validate.
+   * @param capability User capability to authenticate against.
+   * @param check Auditing data presence check required.
+   * @return  A {@link SessionContext} object that provides information about the validated session.
+   * @throws AuthFailedException If the key cannot be validated as the role or lacks
+   * the requested capability.
+   */
+  SessionContext checkAuthorized(SessionKey sessionKey, Capability capability, AuditCheck check)
+      throws AuthFailedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/SessionValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/SessionValidator.java b/src/main/java/org/apache/aurora/auth/SessionValidator.java
new file mode 100644
index 0000000..327c5ca
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/SessionValidator.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import java.util.Set;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * Validator for RPC sessions with Aurora.
+ */
+public interface SessionValidator {
+
+  /**
+   * Checks whether a session key is authenticated, and has permission to act as all the roles
+   * provided. Authentication is successful only if the SessionKey is successfully validated against
+   * all the roles.
+   *
+   * @param sessionKey Key to validate.
+   * @param targetRoles A set of roles to validate the key against.
+   * @return A {@link SessionContext} object that provides information about the validated session.
+   * @throws AuthFailedException If the key cannot be validated against a role.
+   */
+  SessionContext checkAuthenticated(SessionKey sessionKey, Set<String> targetRoles)
+      throws AuthFailedException;
+
+  /**
+   * Translates a {@link SessionKey} to a string. Primarily provides a way for the binary data
+   * within a {@link SessionKey} to be decoded and converted into a string.
+   *
+   * @param sessionKey The session key to translate.
+   * @return A string representation of the {@link SessionKey}.
+   */
+  String toString(SessionKey sessionKey);
+
+  /**
+   * Provides information about a session.
+   */
+  interface SessionContext {
+
+    /**
+     * Provides the identity for a validated session.
+     *
+     * @return A string that identifies the session.
+     */
+    String getIdentity();
+  }
+
+  /**
+   * Thrown when authentication is not successful.
+   */
+  public static class AuthFailedException extends Exception {
+    public AuthFailedException(String msg) {
+      super(msg);
+    }
+
+    public AuthFailedException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java b/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
new file mode 100644
index 0000000..8fe9953
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * An authentication module that uses an {@link UnsecureSessionValidator}. This behavior
+ * can be overridden by binding a secure validator, querying an internal authentication system,
+ * to {@link SessionValidator}.
+ */
+public class UnsecureAuthModule extends AbstractModule {
+  private static final String UNSECURE = "UNSECURE";
+  private static final Logger LOG = Logger.getLogger(UnsecureAuthModule.class.getName());
+
+  @Override
+  protected void configure() {
+    LOG.info("Using default (UNSECURE!!!) authentication module.");
+    bind(SessionValidator.class).to(UnsecureSessionValidator.class);
+    bind(CapabilityValidator.class).to(UnsecureCapabilityValidator.class);
+  }
+
+  static class UnsecureSessionValidator implements SessionValidator {
+    @Override
+    public SessionContext checkAuthenticated(SessionKey key, Set<String> targetRoles)
+        throws AuthFailedException {
+
+      return new SessionContext() {
+        @Override public String getIdentity() {
+          return UNSECURE;
+        }
+      };
+    }
+
+    @Override
+    public String toString(SessionKey sessionKey) {
+      return sessionKey.toString();
+    }
+  }
+
+  static class UnsecureCapabilityValidator implements CapabilityValidator {
+    @Override
+    public SessionContext checkAuthorized(SessionKey key, Capability capability, AuditCheck check)
+        throws AuthFailedException {
+
+      return new SessionContext() {
+        @Override public String getIdentity() {
+          return UNSECURE;
+        }
+      };
+    }
+
+    @Override
+    public SessionContext checkAuthenticated(SessionKey key, Set<String> targetRoles)
+        throws AuthFailedException {
+
+      return new SessionContext() {
+        @Override public String getIdentity() {
+          return UNSECURE;
+        }
+      };
+    }
+
+    @Override
+    public String toString(SessionKey sessionKey) {
+      return sessionKey.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
new file mode 100644
index 0000000..2443078
--- /dev/null
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.codec;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+/**
+ * Codec that works for thrift objects.
+ */
+public final class ThriftBinaryCodec {
+
+  /**
+   * Protocol factory used for all thrift encoding and decoding.
+   */
+  public static final TProtocolFactory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
+
+  private ThriftBinaryCodec() {
+    // Utility class.
+  }
+
+  /**
+   * Identical to {@link #decodeNonNull(Class, byte[])}, but allows for a null buffer.
+   *
+   * @param clazz Class to instantiate and deserialize to.
+   * @param buffer Buffer to decode.
+   * @param <T> Target type.
+   * @return A populated message, or {@code null} if the buffer was {@code null}.
+   * @throws CodingException If the message could not be decoded.
+   */
+  @Nullable
+  public static <T extends TBase<T, ?>> T decode(Class<T> clazz, @Nullable byte[] buffer)
+      throws CodingException {
+
+    if (buffer == null) {
+      return null;
+    }
+    return decodeNonNull(clazz, buffer);
+  }
+
+  /**
+   * Decodes a binary-encoded byte array into a target type.
+   *
+   * @param clazz Class to instantiate and deserialize to.
+   * @param buffer Buffer to decode.
+   * @param <T> Target type.
+   * @return A populated message.
+   * @throws CodingException If the message could not be decoded.
+   */
+  public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz, byte[] buffer)
+      throws CodingException {
+
+    Preconditions.checkNotNull(clazz);
+    Preconditions.checkNotNull(buffer);
+
+    try {
+      T t = clazz.newInstance();
+      new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
+      return t;
+    } catch (IllegalAccessException e) {
+      throw new CodingException("Failed to access constructor for target type.", e);
+    } catch (InstantiationException e) {
+      throw new CodingException("Failed to instantiate target type.", e);
+    } catch (TException e) {
+      throw new CodingException("Failed to deserialize thrift object.", e);
+    }
+  }
+
+  /**
+   * Identical to {@link #encodeNonNull(TBase)}, but allows for a null input.
+   *
+   * @param tBase Object to encode.
+   * @return Encoded object, or {@code null} if the argument was {@code null}.
+   * @throws CodingException If the object could not be encoded.
+   */
+  @Nullable
+  public static byte[] encode(@Nullable TBase<?, ?> tBase) throws CodingException {
+    if (tBase == null) {
+      return null;
+    }
+    return encodeNonNull(tBase);
+  }
+
+  /**
+   * Encodes a thrift object into a binary array.
+   *
+   * @param tBase Object to encode.
+   * @return Encoded object.
+   * @throws CodingException If the object could not be encoded.
+   */
+  public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
+    Preconditions.checkNotNull(tBase);
+
+    try {
+      return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
+    } catch (TException e) {
+      throw new CodingException("Failed to serialize: " + tBase, e);
+    }
+  }
+
+  /**
+   * Thrown when serialization or deserialization failed.
+   */
+  public static class CodingException extends Exception {
+    public CodingException(String message) {
+      super(message);
+    }
+    public CodingException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
new file mode 100644
index 0000000..aa77887
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/Driver.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.StateMachine;
+
+import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
+
+/**
+ * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
+ * <pre>
+ *   (run -> kill*)? -> stop*
+ * </pre>
+ *
+ * Also ensures the driver is only asked for when needed.
+ */
+public interface Driver {
+
+  /**
+   * Launches a task.
+   *
+   * @param offerId ID of the resource offer to accept with the task.
+   * @param task Task to launch.
+   */
+  void launchTask(OfferID offerId, TaskInfo task);
+
+  /**
+   * Declines a resource offer.
+   *
+   * @param offerId ID of the offer to decline.
+   */
+  void declineOffer(OfferID offerId);
+
+  /**
+   * Sends a kill task request for the given {@code taskId} to the mesos master.
+   *
+   * @param taskId The id of the task to kill.
+   */
+  void killTask(String taskId);
+
+  /**
+   * Stops the underlying driver if it is running, otherwise does nothing.
+   */
+  void stop();
+
+  /**
+   * Starts the underlying driver.  Can only be called once.
+   *
+   * @return The status of the underlying driver run request.
+   */
+  Protos.Status start();
+
+  /**
+   * Blocks until the underlying driver is stopped or aborted.
+   *
+   * @return The status of the underlying driver upon exit.
+   */
+  Protos.Status join();
+
+  /**
+   * Mesos driver implementation.
+   */
+  static class DriverImpl implements Driver {
+    private static final Logger LOG = Logger.getLogger(Driver.class.getName());
+
+    /**
+     * Driver states.
+     */
+    enum State {
+      INIT,
+      RUNNING,
+      STOPPED
+    }
+
+    private final StateMachine<State> stateMachine;
+    private final Supplier<Optional<SchedulerDriver>> driverSupplier;
+    private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
+
+    /**
+     * Creates a driver manager that will only ask for the underlying mesos driver when actually
+     * needed.
+     *
+     * @param driverSupplier A factory for the underlying driver.
+     */
+    @Inject
+    DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
+      this.driverSupplier = driverSupplier;
+      this.stateMachine =
+          StateMachine.<State>builder("scheduler_driver")
+              .initialState(State.INIT)
+              .addState(State.INIT, State.RUNNING, State.STOPPED)
+              .addState(State.RUNNING, State.STOPPED)
+              .logTransitions()
+              .throwOnBadTransition(true)
+              .build();
+    }
+
+    private synchronized SchedulerDriver get(State expected) {
+      // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
+      stateMachine.checkState(expected);
+      // This will and should fail if the driver is not present.
+      return driverSupplier.get().get();
+    }
+
+    @Override
+    public void launchTask(OfferID offerId, TaskInfo task) {
+      get(State.RUNNING).launchTasks(offerId, ImmutableList.of(task));
+    }
+
+    @Override
+    public void declineOffer(OfferID offerId) {
+      get(State.RUNNING).declineOffer(offerId);
+    }
+
+    @Override
+    public Protos.Status start() {
+      SchedulerDriver driver = get(State.INIT);
+      stateMachine.transition(State.RUNNING);
+      return driver.start();
+    }
+
+    @Override
+    public Status join() {
+      return get(State.RUNNING).join();
+    }
+
+    @Override
+    public synchronized void stop() {
+      if (stateMachine.getState() == State.RUNNING) {
+        SchedulerDriver driver = get(State.RUNNING);
+        driver.stop(true /* failover */);
+        stateMachine.transition(State.STOPPED);
+      }
+    }
+
+    @Override
+    public void killTask(String taskId) {
+      SchedulerDriver driver = get(State.RUNNING);
+      Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
+
+      if (status != DRIVER_RUNNING) {
+        LOG.severe(String.format("Attempt to kill task %s failed with code %s",
+            taskId, status));
+        killFailures.incrementAndGet();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
new file mode 100644
index 0000000..e39bb09
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.protobuf.ByteString;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.Credential;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Factory to create scheduler driver instances.
+ */
+public interface DriverFactory extends Function<String, SchedulerDriver> {
+
+  static class DriverFactoryImpl implements DriverFactory {
+    private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
+
+    @NotNull
+    @CmdLine(name = "mesos_master_address",
+        help = "Address for the mesos master, can be a socket address or zookeeper path.")
+    private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
+
+    @VisibleForTesting
+    static final String PRINCIPAL_KEY = "aurora_authentication_principal";
+    @VisibleForTesting
+    static final String SECRET_KEY = "aurora_authentication_secret";
+    @CmdLine(name = "framework_authentication_file",
+        help = "Properties file which contains framework credentials to authenticate with Mesos"
+            + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
+            + "'" + SECRET_KEY + "'.")
+    private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
+
+    @CmdLine(name = "framework_failover_timeout",
+        help = "Time after which a framework is considered deleted.  SHOULD BE VERY HIGH.")
+    private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
+        Arg.create(Amount.of(21L, Time.DAYS));
+
+    /**
+     * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
+     * attempt to write checkpoints when required by a task's framework. These checkpoints allow
+     * executors to be reattached rather than killed when a slave is restarted.
+     *
+     * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
+     * enabled.
+     *
+     * Behavior is as follows:
+     * (Scheduler -require_slave_checkpoint=true,  Slave --checkpoint=true):
+     *   Tasks will launch.        Checkpoints will be written.
+     * (Scheduler -require_slave_checkpoint=true,   Slave --checkpoint=false):
+     *   Tasks WILL NOT launch.
+     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=true):
+     *   Tasks will launch.        Checkpoints will not be written.
+     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=false):
+     *   Tasks will launch.        Checkpoints will not be written.
+     *
+     * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
+     * is resolved.
+     */
+    @CmdLine(name = "require_slave_checkpoint",
+        help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
+            + "slave restart should not kill executors, but the scheduler will not be able to "
+            + "launch tasks on slaves without --checkpoint=true in their command lines. See "
+            + "DriverFactory.java for more information.")
+    private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
+
+    private static final String EXECUTOR_USER = "root";
+
+    private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
+
+    private final Provider<Scheduler> scheduler;
+
+    @Inject
+    DriverFactoryImpl(Provider<Scheduler> scheduler) {
+      this.scheduler = Preconditions.checkNotNull(scheduler);
+    }
+
+    @VisibleForTesting
+    static Properties parseCredentials(InputStream credentialsStream) {
+      Properties properties = new Properties();
+      try {
+        properties.load(credentialsStream);
+      } catch (IOException e) {
+        LOG.severe("Unable to load authentication file");
+        throw Throwables.propagate(e);
+      }
+      Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
+          "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
+      Preconditions.checkState(properties.containsKey(SECRET_KEY),
+          "The framework authentication file is missing the key: %s", SECRET_KEY);
+      return properties;
+    }
+
+    @Override
+    public SchedulerDriver apply(@Nullable String frameworkId) {
+      LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
+
+      FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
+          .setUser(EXECUTOR_USER)
+          .setName(TWITTER_FRAMEWORK_NAME)
+          .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
+          .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
+
+      if (frameworkId != null) {
+        LOG.info("Found persisted framework ID: " + frameworkId);
+        frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
+      } else {
+        LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
+      }
+
+      if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
+        Properties properties;
+        try {
+          properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
+        } catch (FileNotFoundException e) {
+          LOG.severe("Authentication File not Found");
+          throw Throwables.propagate(e);
+        }
+
+        LOG.info(String.format("Connecting to master using authentication (principal: %s).",
+            properties.get(PRINCIPAL_KEY)));
+
+        Credential credential = Credential.newBuilder()
+            .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
+            .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
+            .build();
+
+        return new MesosSchedulerDriver(
+            scheduler.get(),
+            frameworkInfo.build(),
+            MESOS_MASTER_ADDRESS.get(),
+            credential);
+      } else {
+        LOG.warning("Connecting to master without authentication!");
+        return new MesosSchedulerDriver(
+            scheduler.get(),
+            frameworkInfo.build(),
+            MESOS_MASTER_ADDRESS.get());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
new file mode 100644
index 0000000..fb41405
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.GuiceUtils.AllowUnchecked;
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.gen.comm.SchedulerMessage;
+import com.twitter.aurora.scheduler.base.Conversions;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Location for communication with mesos.
+ */
+class MesosSchedulerImpl implements Scheduler {
+  private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
+
+  private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
+  private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
+  private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
+  private final AtomicLong frameworkDisconnects =
+      Stats.exportLong("scheduler_framework_disconnects");
+  private final AtomicLong frameworkReregisters =
+      Stats.exportLong("scheduler_framework_reregisters");
+  private final AtomicLong lostExecutors = Stats.exportLong("scheduler_lost_executors");
+
+  private final List<TaskLauncher> taskLaunchers;
+
+  private final Storage storage;
+  private final SchedulerCore schedulerCore;
+  private final Lifecycle lifecycle;
+  private volatile boolean registered = false;
+
+  /**
+   * Creates a new scheduler.
+   *
+   * @param schedulerCore Core scheduler.
+   * @param lifecycle Application lifecycle manager.
+   * @param taskLaunchers Task launchers.
+   */
+  @Inject
+  public MesosSchedulerImpl(
+      Storage storage,
+      SchedulerCore schedulerCore,
+      final Lifecycle lifecycle,
+      List<TaskLauncher> taskLaunchers) {
+
+    this.storage = checkNotNull(storage);
+    this.schedulerCore = checkNotNull(schedulerCore);
+    this.lifecycle = checkNotNull(lifecycle);
+    this.taskLaunchers = checkNotNull(taskLaunchers);
+  }
+
+  @Override
+  public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
+    LOG.info("Received notification of lost slave: " + slaveId);
+  }
+
+  @SendNotification(after = Event.DriverRegistered)
+  @Override
+  public void registered(
+      SchedulerDriver driver,
+      final FrameworkID frameworkId,
+      MasterInfo masterInfo) {
+
+    LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override protected void execute(MutableStoreProvider storeProvider) {
+        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
+      }
+    });
+    registered = true;
+  }
+
+  @SendNotification(after = Event.DriverDisconnected)
+  @Override
+  public void disconnected(SchedulerDriver schedulerDriver) {
+    LOG.warning("Framework disconnected.");
+    frameworkDisconnects.incrementAndGet();
+  }
+
+  @Override
+  public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
+    LOG.info("Framework re-registered with master " + masterInfo);
+    frameworkReregisters.incrementAndGet();
+  }
+
+  private static boolean fitsInOffer(TaskInfo task, Offer offer) {
+    return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
+  }
+
+  @Timed("scheduler_resource_offers")
+  @Override
+  public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
+    Preconditions.checkState(registered, "Must be registered before receiving offers.");
+
+    for (final Offer offer : offers) {
+      log(Level.FINE, "Received offer: %s", offer);
+      resourceOffers.incrementAndGet();
+      storage.write(new MutateWork.NoResult.Quiet() {
+        @Override protected void execute(MutableStoreProvider storeProvider) {
+          storeProvider.getAttributeStore().saveHostAttributes(Conversions.getAttributes(offer));
+        }
+      });
+
+      // Ordering of task launchers is important here, since offers are consumed greedily.
+      // TODO(William Farner): Refactor this area of code now that the primary task launcher
+      // is asynchronous.
+      for (TaskLauncher launcher : taskLaunchers) {
+        Optional<TaskInfo> task = Optional.absent();
+        try {
+          task = launcher.createTask(offer);
+        } catch (SchedulerException e) {
+          LOG.log(Level.WARNING, "Failed to schedule offers.", e);
+          failedOffers.incrementAndGet();
+        }
+
+        if (task.isPresent()) {
+          if (fitsInOffer(task.get(), offer)) {
+            driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
+            break;
+          } else {
+            LOG.warning("Insufficient resources to launch task " + task);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
+    LOG.info("Offer rescinded: " + offerId);
+    for (TaskLauncher launcher : taskLaunchers) {
+      launcher.cancelOffer(offerId);
+    }
+  }
+
+  @AllowUnchecked
+  @Timed("scheduler_status_update")
+  @Override
+  public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
+    String info = status.hasData() ? status.getData().toStringUtf8() : null;
+    String infoMsg = info != null ? " with info " + info : "";
+    String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
+    LOG.info("Received status update for task " + status.getTaskId().getValue()
+        + " in state " + status.getState() + infoMsg + coreMsg);
+
+    try {
+      for (TaskLauncher launcher : taskLaunchers) {
+        if (launcher.statusUpdate(status)) {
+          return;
+        }
+      }
+    } catch (SchedulerException e) {
+      LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
+      // We re-throw the exception here in an effort to NACK the status update and trigger an
+      // abort of the driver.  Previously we directly issued driver.abort(), but the re-entrancy
+      // guarantees of that are uncertain (and we believe it was not working).  However, this
+      // was difficult to discern since logging is unreliable during JVM shutdown and we would not
+      // see the above log message.
+      throw e;
+    }
+
+    LOG.warning("Unhandled status update " + status);
+    failedStatusUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void error(SchedulerDriver driver, String message) {
+    LOG.severe("Received error message: " + message);
+    lifecycle.shutdown();
+  }
+
+  @Override
+  public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
+      int status) {
+
+    LOG.info("Lost executor " + executorID);
+    lostExecutors.incrementAndGet();
+  }
+
+  @Timed("scheduler_framework_message")
+  @Override
+  public void frameworkMessage(SchedulerDriver driver, ExecutorID executor, SlaveID slave,
+      byte[] data) {
+
+    if (data == null) {
+      LOG.info("Received empty framework message.");
+      return;
+    }
+
+    try {
+      SchedulerMessage schedulerMsg = ThriftBinaryCodec.decode(SchedulerMessage.class, data);
+      if (schedulerMsg == null || !schedulerMsg.isSet()) {
+        LOG.warning("Received empty scheduler message.");
+        return;
+      }
+
+      switch (schedulerMsg.getSetField()) {
+        case DELETED_TASKS:
+          // TODO(William Farner): Refactor this to use a thinner interface here.  As it stands
+          // it is odd that we route the registered() call to schedulerCore via the
+          // registeredListener and call the schedulerCore directly here.
+          schedulerCore.tasksDeleted(schedulerMsg.getDeletedTasks().getTaskIds());
+          break;
+
+        default:
+          LOG.warning("Received unhandled scheduler message type: " + schedulerMsg.getSetField());
+          break;
+      }
+    } catch (ThriftBinaryCodec.CodingException e) {
+      LOG.log(Level.SEVERE, "Failed to decode framework message.", e);
+    }
+  }
+
+  private static void log(Level level, String message, Object... args) {
+    if (LOG.isLoggable(level)) {
+      LOG.log(level, String.format(message, args));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
new file mode 100644
index 0000000..5f73f71
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.Protobufs;
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.scheduler.base.CommandUtil;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.quantity.Data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * A factory to create mesos task objects.
+ */
+public interface MesosTaskFactory {
+
+  /**
+   * Creates a mesos task object.
+   *
+   * @param task Assigned task to translate into a task object.
+   * @param slaveId Id of the slave the task is being assigned to.
+   * @return A new task.
+   * @throws SchedulerException If the task could not be encoded.
+   */
+  TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+
+  static class ExecutorConfig {
+    private final String executorPath;
+
+    public ExecutorConfig(String executorPath) {
+      this.executorPath = checkNotBlank(executorPath);
+    }
+
+    String getExecutorPath() {
+      return executorPath;
+    }
+  }
+
+  static class MesosTaskFactoryImpl implements MesosTaskFactory {
+    private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
+    private static final String EXECUTOR_PREFIX = "thermos-";
+
+    /**
+     * Name to associate with task executors.
+     */
+    @VisibleForTesting
+    static final String EXECUTOR_NAME = "aurora.task";
+
+    private final String executorPath;
+
+    @Inject
+    MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
+      this.executorPath = executorConfig.getExecutorPath();
+    }
+
+    @VisibleForTesting
+    static ExecutorID getExecutorId(String taskId) {
+      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
+    }
+
+    public static String getJobSourceName(IJobKey jobkey) {
+      return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
+    }
+
+    public static String getJobSourceName(ITaskConfig task) {
+      return getJobSourceName(JobKeys.from(task));
+    }
+
+    public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+      return String.format("%s.%s", getJobSourceName(task), instanceId);
+    }
+
+    @Override
+    public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+      checkNotNull(task);
+      byte[] taskInBytes;
+      try {
+        taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
+      } catch (ThriftBinaryCodec.CodingException e) {
+        LOG.log(Level.SEVERE, "Unable to serialize task.", e);
+        throw new SchedulerException("Internal error.", e);
+      }
+
+      ITaskConfig config = task.getTask();
+      List<Resource> resources;
+      if (task.isSetAssignedPorts()) {
+        resources = Resources.from(config)
+            .toResourceList(ImmutableSet.copyOf(task.getAssignedPorts().values()));
+      } else {
+        resources = ImmutableList.of();
+      }
+
+      if (LOG.isLoggable(Level.FINE)) {
+        LOG.fine("Setting task resources to "
+            + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
+      }
+      TaskInfo.Builder taskBuilder =
+          TaskInfo.newBuilder()
+              .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+              .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
+              .setSlaveId(slaveId)
+              .addAllResources(resources)
+              .setData(ByteString.copyFrom(taskInBytes));
+
+      ExecutorInfo executor = ExecutorInfo.newBuilder()
+          .setCommand(CommandUtil.create(executorPath))
+          .setExecutorId(getExecutorId(task.getTaskId()))
+          .setName(EXECUTOR_NAME)
+          .setSource(getInstanceSourceName(config, task.getInstanceId()))
+          .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS))
+          .addResources(
+              Resources.makeMesosResource(Resources.RAM_MB, ResourceSlot.EXECUTOR_RAM.as(Data.MB)))
+          .build();
+      return taskBuilder
+          .setExecutor(executor)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
new file mode 100644
index 0000000..a9c14e6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * Resource containing class that is aware of executor overhead.
+ */
+public final class ResourceSlot {
+
+  private final Resources resources;
+
+  /**
+   * CPU allocated for each executor.
+   */
+  @VisibleForTesting
+  static final double EXECUTOR_CPUS = 0.25;
+
+  /**
+   * RAM required for the executor.  Executors in the wild have been observed using 48-54MB RSS,
+   * setting to 128MB to be extra vigilant initially.
+   */
+  @VisibleForTesting
+  static final Amount<Long, Data> EXECUTOR_RAM = Amount.of(128L, Data.MB);
+
+  private ResourceSlot(Resources r) {
+    this.resources = r;
+  }
+
+  public static ResourceSlot from(ITaskConfig task) {
+    double totalCPU = task.getNumCpus() + EXECUTOR_CPUS;
+    Amount<Long, Data> totalRAM = Amount.of(task.getRamMb() + EXECUTOR_RAM.as(Data.MB), Data.MB);
+    Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
+    return new ResourceSlot(
+        new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
+  }
+
+  public static ResourceSlot from(Offer offer) {
+    return new ResourceSlot(Resources.from(offer));
+  }
+
+  public double getNumCpus() {
+    return resources.getNumCpus();
+  }
+
+  public Amount<Long, Data> getRam() {
+    return resources.getRam();
+  }
+
+  public Amount<Long, Data> getDisk() {
+    return resources.getDisk();
+  }
+
+  public int getNumPorts() {
+    return resources.getNumPorts();
+  }
+
+  @VisibleForTesting
+  public static ResourceSlot from(double cpu,
+                                  Amount<Long, Data> ram,
+                                  Amount<Long, Data> disk,
+                                  int ports) {
+    double totalCPU = cpu + EXECUTOR_CPUS;
+    Amount<Long, Data> totalRAM = Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.as(Data.MB), Data.MB);
+
+    return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
+  }
+
+  public static ResourceSlot sum(ResourceSlot... rs) {
+    return sum(Arrays.asList(rs));
+  }
+
+  public static ResourceSlot sum(Iterable<ResourceSlot> rs) {
+    Resources r = Resources.sum(Iterables.transform(rs, new Function<ResourceSlot, Resources>() {
+      @Override public Resources apply(ResourceSlot input) {
+        return input.resources;
+      }
+    }));
+
+    return new ResourceSlot(r);
+  }
+
+  public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
+    @Override public int compare(ResourceSlot left, ResourceSlot right) {
+      return Resources.RESOURCE_ORDER.compare(left.resources, right.resources);
+    }
+  };
+}