You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:32 UTC
[39/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
deleted file mode 100644
index 4afc263..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/AopModule.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matcher;
-import com.google.inject.matcher.Matchers;
-
-import org.aopalliance.intercept.MethodInterceptor;
-
-import com.twitter.aurora.GuiceUtils;
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.gen.AuroraAdmin;
-import com.twitter.aurora.gen.AuroraSchedulerManager;
-import com.twitter.aurora.scheduler.thrift.auth.DecoratedThrift;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-
-/**
- * Binding module for AOP-style decorations of the thrift API.
- */
-public class AopModule extends AbstractModule {
-
- @CmdLine(name = "enable_job_updates", help = "Whether new job updates should be accepted.")
- private static final Arg<Boolean> ENABLE_UPDATES = Arg.create(true);
-
- @CmdLine(name = "enable_job_creation",
- help = "Allow new jobs to be created, if false all job creation requests will be denied.")
- private static final Arg<Boolean> ENABLE_JOB_CREATION = Arg.create(true);
-
- private static final Matcher<? super Class<?>> THRIFT_IFACE_MATCHER =
- Matchers.subclassesOf(AuroraAdmin.Iface.class)
- .and(Matchers.annotatedWith(DecoratedThrift.class));
-
- private final Map<String, Boolean> toggledMethods;
-
- public AopModule() {
- this(ImmutableMap.of(
- "createJob", ENABLE_JOB_CREATION.get(),
- "acquireLock", ENABLE_UPDATES.get()));
- }
-
- @VisibleForTesting
- AopModule(Map<String, Boolean> toggledMethods) {
- this.toggledMethods = ImmutableMap.copyOf(toggledMethods);
- }
-
- private static final Function<Method, String> GET_NAME = new Function<Method, String>() {
- @Override public String apply(Method method) {
- return method.getName();
- }
- };
-
- @Override
- protected void configure() {
- requireBinding(CapabilityValidator.class);
-
- // Layer ordering:
- // Log -> CapabilityValidator -> FeatureToggle -> StatsExporter -> APIVersion ->
- // SchedulerThriftInterface
-
- // TODO(Sathya): Consider using provider pattern for constructing interceptors to facilitate
- // unit testing without the creation of Guice injectors.
- bindThriftDecorator(new LoggingInterceptor());
-
- // Note: it's important that the capability interceptor is only applied to AuroraAdmin.Iface
- // methods, and does not pick up methods on AuroraSchedulerManager.Iface.
- MethodInterceptor authInterceptor = new UserCapabilityInterceptor();
- requestInjection(authInterceptor);
- bindInterceptor(
- THRIFT_IFACE_MATCHER,
- GuiceUtils.interfaceMatcher(AuroraAdmin.Iface.class, true),
- authInterceptor);
-
- install(new PrivateModule() {
- @Override protected void configure() {
- // Ensure that the provided methods exist on the decorated interface.
- List<Method> methods =
- ImmutableList.copyOf(AuroraSchedulerManager.Iface.class.getMethods());
- for (String toggledMethod : toggledMethods.keySet()) {
- Preconditions.checkArgument(
- Iterables.any(methods,
- Predicates.compose(Predicates.equalTo(toggledMethod), GET_NAME)),
- String.format("Method %s was not found in class %s",
- toggledMethod,
- AuroraSchedulerManager.Iface.class));
- }
-
- bind(new TypeLiteral<Map<String, Boolean>>() { }).toInstance(toggledMethods);
- bind(IsFeatureEnabled.class).in(Singleton.class);
- Key<Predicate<Method>> predicateKey = Key.get(new TypeLiteral<Predicate<Method>>() { });
- bind(predicateKey).to(IsFeatureEnabled.class);
- expose(predicateKey);
- }
- });
- bindThriftDecorator(new FeatureToggleInterceptor());
- bindThriftDecorator(new ThriftStatsExporterInterceptor());
- bindThriftDecorator(new APIVersionInterceptor());
- }
-
- private void bindThriftDecorator(MethodInterceptor interceptor) {
- bindThriftDecorator(binder(), THRIFT_IFACE_MATCHER, interceptor);
- }
-
- @VisibleForTesting
- static void bindThriftDecorator(
- Binder binder,
- Matcher<? super Class<?>> classMatcher,
- MethodInterceptor interceptor) {
-
- binder.bindInterceptor(classMatcher, Matchers.any(), interceptor);
- binder.requestInjection(interceptor);
- }
-
- private static class IsFeatureEnabled implements Predicate<Method> {
- private final Predicate<String> methodEnabled;
-
- @Inject
- IsFeatureEnabled(Map<String, Boolean> toggleMethods) {
- Predicate<String> builder = Predicates.alwaysTrue();
- for (Map.Entry<String, Boolean> toggleMethod : toggleMethods.entrySet()) {
- Predicate<String> enableMethod = Predicates.or(
- toggleMethod.getValue()
- ? Predicates.<String>alwaysTrue()
- : Predicates.<String>alwaysFalse(),
- Predicates.not(Predicates.equalTo(toggleMethod.getKey())));
- builder = Predicates.and(builder, enableMethod);
- }
- methodEnabled = builder;
- }
-
- @Override
- public boolean apply(Method method) {
- return methodEnabled.apply(method.getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
deleted file mode 100644
index 03c3d99..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/FeatureToggleInterceptor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Predicate;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.gen.ResponseCode;
-
-/**
- * A method interceptor that blocks access to features based on a supplied predicate.
- */
-public class FeatureToggleInterceptor implements MethodInterceptor {
-
- @Inject private Predicate<Method> allowMethod;
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- Method method = invocation.getMethod();
- if (!allowMethod.apply(method)) {
- return Interceptors.properlyTypedResponse(
- method,
- ResponseCode.ERROR,
- "The " + method.getName() + " feature is currently disabled on this scheduler.");
- } else {
- return invocation.proceed();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
deleted file mode 100644
index d0cb9c1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/Interceptors.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
-import com.google.common.base.Throwables;
-
-import com.twitter.aurora.gen.ResponseCode;
-
-/**
- * Utility class for functions useful when implementing an interceptor on the thrift interface.
- */
-final class Interceptors {
-
- private Interceptors() {
- // Utility class.
- }
-
- private static final Logger LOG = Logger.getLogger(Interceptors.class.getName());
-
- static Object properlyTypedResponse(Method method, ResponseCode responseCode, String message)
- throws IllegalAccessException, InstantiationException {
-
- Class<?> returnType = method.getReturnType();
- Object response = returnType.newInstance();
- invoke(returnType, response, "setResponseCode", ResponseCode.class, responseCode);
- invoke(returnType, response, "setMessage", String.class, message);
- return response;
- }
-
- private static <T> void invoke(
- Class<?> type,
- Object obj,
- String name,
- Class<T> parameterType,
- T argument) {
-
- Method method;
- try {
- method = type.getMethod(name, parameterType);
- } catch (NoSuchMethodException e) {
- LOG.severe(type + " does not support " + name);
- throw Throwables.propagate(e);
- }
- try {
- method.invoke(obj, argument);
- } catch (IllegalAccessException e) {
- LOG.severe("Method " + name + " is not accessible in " + type);
- throw Throwables.propagate(e);
- } catch (InvocationTargetException e) {
- LOG.severe("Failed to invoke " + name + " on " + type);
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
deleted file mode 100644
index 5f773cc..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/LoggingInterceptor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.gen.ExecutorConfig;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.SessionKey;
-
-import static com.twitter.aurora.scheduler.thrift.aop.Interceptors.properlyTypedResponse;
-
-/**
- * A method interceptor that logs all invocations as well as any unchecked exceptions thrown from
- * the underlying call.
- */
-class LoggingInterceptor implements MethodInterceptor {
-
- private static final Logger LOG = Logger.getLogger(LoggingInterceptor.class.getName());
-
- @Inject private CapabilityValidator validator;
-
- // TODO(wfarner): Scrub updateToken when it is identifiable by type.
- private final Map<Class<?>, Function<Object, String>> printFunctions =
- ImmutableMap.<Class<?>, Function<Object, String>>of(
- JobConfiguration.class,
- new Function<Object, String>() {
- @Override public String apply(Object input) {
- JobConfiguration configuration = ((JobConfiguration) input).deepCopy();
- if (configuration.isSetTaskConfig()) {
- configuration.getTaskConfig().setExecutorConfig(
- new ExecutorConfig("BLANKED", "BLANKED"));
- }
- return configuration.toString();
- }
- },
- SessionKey.class,
- new Function<Object, String>() {
- @Override public String apply(Object input) {
- SessionKey key = (SessionKey) input;
- return validator.toString(key);
- }
- }
- );
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- List<String> argStrings = Lists.newArrayList();
- for (Object arg : invocation.getArguments()) {
- if (arg == null) {
- argStrings.add("null");
- } else {
- Function<Object, String> printFunction = printFunctions.get(arg.getClass());
- argStrings.add((printFunction == null) ? arg.toString() : printFunction.apply(arg));
- }
- }
- String methodName = invocation.getMethod().getName();
- String message = String.format("%s(%s)", methodName, Joiner.on(", ").join(argStrings));
- LOG.info(message);
- try {
- return invocation.proceed();
- } catch (RuntimeException e) {
- LOG.log(Level.WARNING, "Uncaught exception while handling " + message, e);
- return properlyTypedResponse(invocation.getMethod(), ResponseCode.ERROR, e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
deleted file mode 100644
index d700ab5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/ThriftStatsExporterInterceptor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
-
-/**
- * A method interceptor that exports counterStats about thrift calls.
- */
-class ThriftStatsExporterInterceptor implements MethodInterceptor {
-
- private final LoadingCache<Method, SlidingStats> stats =
- CacheBuilder.newBuilder().build(new CacheLoader<Method, SlidingStats>() {
- @Override public SlidingStats load(Method method) {
- return new SlidingStats(
- Stats.normalizeName(String.format("scheduler_thrift_%s", method.getName())),
- "nanos");
- }
- });
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- SlidingStats stat = stats.get(invocation.getMethod());
- long start = System.nanoTime();
- try {
- return invocation.proceed();
- } finally {
- stat.accumulate(System.nanoTime() - start);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java b/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
deleted file mode 100644
index d9240bc..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/aop/UserCapabilityInterceptor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.aop;
-
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.AuditCheck;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator.AuthFailedException;
-import com.twitter.aurora.gen.ResponseCode;
-import com.twitter.aurora.gen.SessionKey;
-import com.twitter.aurora.scheduler.thrift.auth.Requires;
-
-/**
- * A method interceptor that will authenticate users identified by a {@link SessionKey} argument
- * to invoked methods.
- * <p>
- * Intercepted methods will require {@link Capability#ROOT}, but additional capabilities
- * may be specified by annotating methods with {@link Requires} and supplying a whitelist.
- */
-class UserCapabilityInterceptor implements MethodInterceptor {
- private static final Logger LOG = Logger.getLogger(UserCapabilityInterceptor.class.getName());
-
- @Inject private CapabilityValidator capabilityValidator;
-
- private static final Function<Object, SessionKey> CAST = new Function<Object, SessionKey>() {
- @Override public SessionKey apply(Object o) {
- return (SessionKey) o;
- }
- };
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- Preconditions.checkNotNull(capabilityValidator, "Session validator has not yet been set.");
-
- // Ensure ROOT is always permitted.
- ImmutableList.Builder<Capability> whitelistBuilder =
- ImmutableList.<Capability>builder().add(Capability.ROOT);
-
- Method method = invocation.getMethod();
- Requires requires = method.getAnnotation(Requires.class);
- if (requires != null) {
- whitelistBuilder.add(requires.whitelist());
- }
-
- List<Capability> whitelist = whitelistBuilder.build();
- LOG.fine("Operation " + method.getName() + " may be performed by: " + whitelist);
- Optional<SessionKey> sessionKey = FluentIterable.from(Arrays.asList(invocation.getArguments()))
- .firstMatch(Predicates.instanceOf(SessionKey.class)).transform(CAST);
- if (!sessionKey.isPresent()) {
- LOG.severe("Interceptor should only be applied to methods accepting a SessionKey, but "
- + method + " does not.");
- return invocation.proceed();
- }
-
- String key = capabilityValidator.toString(sessionKey.get());
- for (Capability user : whitelist) {
- LOG.fine("Attempting to validate " + key + " against " + user);
- try {
- capabilityValidator.checkAuthorized(sessionKey.get(), user, AuditCheck.NONE);
-
- LOG.info("Permitting " + key + " to act as "
- + user + " and perform action " + method.getName());
- return invocation.proceed();
- } catch (AuthFailedException e) {
- LOG.fine("Auth failed: " + e);
- }
- }
-
- // User is not permitted to perform this operation.
- return Interceptors.properlyTypedResponse(
- method,
- ResponseCode.AUTH_FAILED,
- "Session identified by '" + key
- + "' does not have the required capability to perform this action: " + whitelist);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
deleted file mode 100644
index 4a667c5..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/DecoratedThrift.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.ElementType.TYPE;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Type annotation to apply to a thrift interface implementation that should be decorated with
- * additional functionality.
- */
-@Target({PARAMETER, TYPE}) @Retention(RUNTIME)
-public @interface DecoratedThrift {
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
deleted file mode 100644
index 0fff3f6..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/Requires.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Annotation applied to a method that may allow users with non-ROOT capabilities to perform
- * an action.
- */
-@Target(METHOD) @Retention(RUNTIME)
-public @interface Requires {
- /**
- * The list of capabilities required to perform an action.
- */
- Capability[] whitelist() default { Capability.ROOT };
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java b/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
deleted file mode 100644
index 66f9033..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/thrift/auth/ThriftAuthModule.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.thrift.auth;
-
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.AbstractModule;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.CapabilityValidator.Capability;
-import com.twitter.aurora.auth.SessionValidator;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotEmpty;
-
-/**
- * Binding module for authentication of users with special capabilities for admin functions.
- */
-public class ThriftAuthModule extends AbstractModule {
-
- private static final Map<Capability, String> DEFAULT_CAPABILITIES =
- ImmutableMap.of(Capability.ROOT, "mesos");
-
- @NotEmpty
- @CmdLine(name = "user_capabilities",
- help = "Concrete name mappings for administration capabilities.")
- private static final Arg<Map<Capability, String>> USER_CAPABILITIES =
- Arg.create(DEFAULT_CAPABILITIES);
-
- private Map<Capability, String> capabilities;
-
- public ThriftAuthModule() {
- this(USER_CAPABILITIES.get());
- }
-
- @VisibleForTesting
- public ThriftAuthModule(Map<Capability, String> capabilities) {
- this.capabilities = Preconditions.checkNotNull(capabilities);
- }
-
- @Override
- protected void configure() {
- Preconditions.checkArgument(
- capabilities.containsKey(Capability.ROOT),
- "A ROOT capability must be provided with --user_capabilities");
- bind(new TypeLiteral<Map<Capability, String>>() { }).toInstance(capabilities);
-
- requireBinding(SessionValidator.class);
- requireBinding(CapabilityValidator.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/GuiceUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuiceUtils.java b/src/main/java/org/apache/aurora/GuiceUtils.java
new file mode 100644
index 0000000..207535d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/GuiceUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.lang.reflect.Method;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.inject.Binder;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.matcher.AbstractMatcher;
+import com.google.inject.matcher.Matcher;
+import com.google.inject.matcher.Matchers;
+
+import org.aopalliance.intercept.MethodInterceptor;
+import org.aopalliance.intercept.MethodInvocation;
+
+import com.twitter.common.collections.Pair;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Utilities for guice configuration in aurora.
+ */
+public final class GuiceUtils {
+
+ private static final Logger LOG = Logger.getLogger(GuiceUtils.class.getName());
+
+ // Method annotation that allows a trapped interface to whitelist methods that may throw
+ // unchecked exceptions.
+ @BindingAnnotation
+ @Target(METHOD) @Retention(RUNTIME)
+ public @interface AllowUnchecked { }
+
+ private GuiceUtils() {
+ // utility
+ }
+
+ // No wildcards on the Class here because it upsets checkstyle - complains with:
+ // '>' is followed by whitespace.
+ private static final Function<Method, Pair<String, Class[]>> CANONICALIZE =
+ new Function<Method, Pair<String, Class[]>>() {
+ @Override public Pair<String, Class[]> apply(Method method) {
+ return Pair.of(method.getName(), (Class[]) method.getParameterTypes());
+ }
+ };
+
+ /**
+ * Creates a matcher that will match methods of an interface, optionally excluding inherited
+ * methods.
+ *
+ * @param matchInterface The interface to match.
+ * @param declaredMethodsOnly if {@code true} only methods directly declared in the interface
+ * will be matched, otherwise all methods on the interface are matched.
+ * @return A new matcher instance.
+ */
+ public static Matcher<Method> interfaceMatcher(
+ Class<?> matchInterface,
+ boolean declaredMethodsOnly) {
+
+ Method[] methods =
+ declaredMethodsOnly ? matchInterface.getDeclaredMethods() : matchInterface.getMethods();
+ final Set<Pair<String, Class[]>> interfaceMethods =
+ ImmutableSet.copyOf(Iterables.transform(ImmutableList.copyOf(methods), CANONICALIZE));
+ final LoadingCache<Method, Pair<String, Class[]>> cache = CacheBuilder.newBuilder()
+ .build(CacheLoader.from(CANONICALIZE));
+
+ return new AbstractMatcher<Method>() {
+ @Override public boolean matches(Method method) {
+ return interfaceMethods.contains(cache.getUnchecked(method));
+ }
+ };
+ }
+
+ /**
+ * Binds an interceptor that ensures the main ClassLoader is bound as the thread context
+ * {@link ClassLoader} during JNI callbacks from mesos. Some libraries require a thread
+ * context ClassLoader be set and this ensures those libraries work properly.
+ *
+ * @param binder The binder to use to register an interceptor with.
+ * @param wrapInterface Interface whose methods should wrapped.
+ */
+ public static void bindJNIContextClassLoader(Binder binder, Class<?> wrapInterface) {
+ final ClassLoader mainClassLoader = GuiceUtils.class.getClassLoader();
+ binder.bindInterceptor(
+ Matchers.subclassesOf(wrapInterface),
+ interfaceMatcher(wrapInterface, false),
+ new MethodInterceptor() {
+ @Override public Object invoke(MethodInvocation invocation) throws Throwable {
+ Thread currentThread = Thread.currentThread();
+ ClassLoader prior = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(mainClassLoader);
+ return invocation.proceed();
+ } finally {
+ currentThread.setContextClassLoader(prior);
+ }
+ }
+ });
+ }
+
+ private static final Predicate<Method> IS_WHITELISTED = new Predicate<Method>() {
+ @Override public boolean apply(Method method) {
+ return method.getAnnotation(AllowUnchecked.class) != null;
+ }
+ };
+
+ private static final Matcher<Method> WHITELIST_MATCHER = new AbstractMatcher<Method>() {
+ @Override public boolean matches(Method method) {
+ return IS_WHITELISTED.apply(method);
+ }
+ };
+
+ private static final Predicate<Method> VOID_METHOD = new Predicate<Method>() {
+ @Override public boolean apply(Method method) {
+ return method.getReturnType() == Void.TYPE;
+ }
+ };
+
+ /**
+ * Binds an exception trap on all interface methods of all classes bound against an interface.
+ * Individual methods may opt out of trapping by annotating with {@link AllowUnchecked}.
+ * Only void methods are allowed, any non-void interface methods must explicitly opt out.
+ *
+ * @param binder The binder to register an interceptor with.
+ * @param wrapInterface Interface whose methods should be wrapped.
+ * @throws IllegalArgumentException If any of the non-whitelisted interface methods are non-void.
+ */
+ public static void bindExceptionTrap(Binder binder, Class<?> wrapInterface)
+ throws IllegalArgumentException {
+
+ Set<Method> disallowed = ImmutableSet.copyOf(Iterables.filter(
+ ImmutableList.copyOf(wrapInterface.getMethods()),
+ Predicates.and(Predicates.not(IS_WHITELISTED), Predicates.not(VOID_METHOD))));
+ Preconditions.checkArgument(disallowed.isEmpty(),
+ "Non-void methods must be explicitly whitelisted with @AllowUnchecked: " + disallowed);
+
+ Matcher<Method> matcher =
+ Matchers.<Method>not(WHITELIST_MATCHER).and(interfaceMatcher(wrapInterface, false));
+ binder.bindInterceptor(Matchers.subclassesOf(wrapInterface), matcher,
+ new MethodInterceptor() {
+ @Override public Object invoke(MethodInvocation invocation) throws Throwable {
+ try {
+ return invocation.proceed();
+ } catch (RuntimeException e) {
+ LOG.log(Level.WARNING, "Trapped uncaught exception: " + e, e);
+ return null;
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/Protobufs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/Protobufs.java b/src/main/java/org/apache/aurora/Protobufs.java
new file mode 100644
index 0000000..c3ddea4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/Protobufs.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora;
+
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Utility functions that are useful for working with protocol buffer messages.
+ */
+public final class Protobufs {
+
+ private Protobufs() {
+ // Utility class.
+ }
+
+ /**
+ * Function to call {@link #toString(Message)} on message objects.
+ */
+ public static final Function<Message, String> SHORT_TOSTRING = new Function<Message, String>() {
+ @Override public String apply(Message message) {
+ return Protobufs.toString(message);
+ }
+ };
+
+ /**
+ * Alternative to the default protobuf toString implementation, which omits newlines.
+ *
+ * @param message Message to print.
+ * @return String representation of the message.
+ */
+ public static String toString(Message message) {
+ return TextFormat.shortDebugString(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/CapabilityValidator.java b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
new file mode 100644
index 0000000..05958f0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * A session validator that supports user capability matching.
+ * <p>
+ * This supports asking whether a user has been granted a specific administration capability.
+ */
+public interface CapabilityValidator extends SessionValidator {
+
+ enum Capability {
+ ROOT,
+ PROVISIONER
+ }
+
+ /**
+ * Audit check to perform for a given action.
+ */
+ enum AuditCheck {
+ /**
+ * Default. No audit checks will be performed.
+ */
+ NONE,
+
+ /**
+ * A check will be performed to verify if a given action has
+ * all necessary data to generate a valid audit trail.
+ */
+ REQUIRED
+ }
+
+ /**
+ * Checks whether a session key is authenticated, and has the specified capability.
+ *
+ * @param sessionKey Key to validate.
+ * @param capability User capability to authenticate against.
+ * @param check Auditing data presence check required.
+ * @return A {@link SessionContext} object that provides information about the validated session.
+ * @throws AuthFailedException If the key cannot be validated as the role or lacks
+ * the requested capability.
+ */
+ SessionContext checkAuthorized(SessionKey sessionKey, Capability capability, AuditCheck check)
+ throws AuthFailedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/SessionValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/SessionValidator.java b/src/main/java/org/apache/aurora/auth/SessionValidator.java
new file mode 100644
index 0000000..327c5ca
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/SessionValidator.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import java.util.Set;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * Validator for RPC sessions with Aurora.
+ */
+public interface SessionValidator {
+
+ /**
+ * Checks whether a session key is authenticated, and has permission to act as all the roles
+ * provided. Authentication is successful only if the SessionKey is successfully validated against
+ * all the roles.
+ *
+ * @param sessionKey Key to validate.
+ * @param targetRoles A set of roles to validate the key against.
+ * @return A {@link SessionContext} object that provides information about the validated session.
+ * @throws AuthFailedException If the key cannot be validated against a role.
+ */
+ SessionContext checkAuthenticated(SessionKey sessionKey, Set<String> targetRoles)
+ throws AuthFailedException;
+
+ /**
+ * Translates a {@link SessionKey} to a string. Primarily provides a way for the binary data
+ * within a {@link SessionKey} to be decoded and converted into a string.
+ *
+ * @param sessionKey The session key to translate.
+ * @return A string representation of the {@link SessionKey}.
+ */
+ String toString(SessionKey sessionKey);
+
+ /**
+ * Provides information about a session.
+ */
+ interface SessionContext {
+
+ /**
+ * Provides the identity for a validated session.
+ *
+ * @return A string that identifies the session.
+ */
+ String getIdentity();
+ }
+
+ /**
+ * Thrown when authentication is not successful.
+ */
+ public static class AuthFailedException extends Exception {
+ public AuthFailedException(String msg) {
+ super(msg);
+ }
+
+ public AuthFailedException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java b/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
new file mode 100644
index 0000000..8fe9953
--- /dev/null
+++ b/src/main/java/org/apache/aurora/auth/UnsecureAuthModule.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.auth;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.google.inject.AbstractModule;
+
+import com.twitter.aurora.gen.SessionKey;
+
+/**
+ * An authentication module that uses an {@link UnsecureSessionValidator}. This behavior
+ * can be overridden by binding a secure validator, querying an internal authentication system,
+ * to {@link SessionValidator}.
+ */
+public class UnsecureAuthModule extends AbstractModule {
+ private static final String UNSECURE = "UNSECURE";
+ private static final Logger LOG = Logger.getLogger(UnsecureAuthModule.class.getName());
+
+ @Override
+ protected void configure() {
+ LOG.info("Using default (UNSECURE!!!) authentication module.");
+ bind(SessionValidator.class).to(UnsecureSessionValidator.class);
+ bind(CapabilityValidator.class).to(UnsecureCapabilityValidator.class);
+ }
+
+ static class UnsecureSessionValidator implements SessionValidator {
+ @Override
+ public SessionContext checkAuthenticated(SessionKey key, Set<String> targetRoles)
+ throws AuthFailedException {
+
+ return new SessionContext() {
+ @Override public String getIdentity() {
+ return UNSECURE;
+ }
+ };
+ }
+
+ @Override
+ public String toString(SessionKey sessionKey) {
+ return sessionKey.toString();
+ }
+ }
+
+ static class UnsecureCapabilityValidator implements CapabilityValidator {
+ @Override
+ public SessionContext checkAuthorized(SessionKey key, Capability capability, AuditCheck check)
+ throws AuthFailedException {
+
+ return new SessionContext() {
+ @Override public String getIdentity() {
+ return UNSECURE;
+ }
+ };
+ }
+
+ @Override
+ public SessionContext checkAuthenticated(SessionKey key, Set<String> targetRoles)
+ throws AuthFailedException {
+
+ return new SessionContext() {
+ @Override public String getIdentity() {
+ return UNSECURE;
+ }
+ };
+ }
+
+ @Override
+ public String toString(SessionKey sessionKey) {
+ return sessionKey.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
new file mode 100644
index 0000000..2443078
--- /dev/null
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.codec;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+/**
+ * Codec that works for thrift objects.
+ */
+public final class ThriftBinaryCodec {
+
+ /**
+ * Protocol factory used for all thrift encoding and decoding.
+ */
+ public static final TProtocolFactory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
+
+ private ThriftBinaryCodec() {
+ // Utility class.
+ }
+
+ /**
+ * Identical to {@link #decodeNonNull(Class, byte[])}, but allows for a null buffer.
+ *
+ * @param clazz Class to instantiate and deserialize to.
+ * @param buffer Buffer to decode.
+ * @param <T> Target type.
+ * @return A populated message, or {@code null} if the buffer was {@code null}.
+ * @throws CodingException If the message could not be decoded.
+ */
+ @Nullable
+ public static <T extends TBase<T, ?>> T decode(Class<T> clazz, @Nullable byte[] buffer)
+ throws CodingException {
+
+ if (buffer == null) {
+ return null;
+ }
+ return decodeNonNull(clazz, buffer);
+ }
+
+ /**
+ * Decodes a binary-encoded byte array into a target type.
+ *
+ * @param clazz Class to instantiate and deserialize to.
+ * @param buffer Buffer to decode.
+ * @param <T> Target type.
+ * @return A populated message.
+ * @throws CodingException If the message could not be decoded.
+ */
+ public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz, byte[] buffer)
+ throws CodingException {
+
+ Preconditions.checkNotNull(clazz);
+ Preconditions.checkNotNull(buffer);
+
+ try {
+ T t = clazz.newInstance();
+ new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
+ return t;
+ } catch (IllegalAccessException e) {
+ throw new CodingException("Failed to access constructor for target type.", e);
+ } catch (InstantiationException e) {
+ throw new CodingException("Failed to instantiate target type.", e);
+ } catch (TException e) {
+ throw new CodingException("Failed to deserialize thrift object.", e);
+ }
+ }
+
+ /**
+ * Identical to {@link #encodeNonNull(TBase)}, but allows for a null input.
+ *
+ * @param tBase Object to encode.
+ * @return Encoded object, or {@code null} if the argument was {@code null}.
+ * @throws CodingException If the object could not be encoded.
+ */
+ @Nullable
+ public static byte[] encode(@Nullable TBase<?, ?> tBase) throws CodingException {
+ if (tBase == null) {
+ return null;
+ }
+ return encodeNonNull(tBase);
+ }
+
+ /**
+ * Encodes a thrift object into a binary array.
+ *
+ * @param tBase Object to encode.
+ * @return Encoded object.
+ * @throws CodingException If the object could not be encoded.
+ */
+ public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
+ Preconditions.checkNotNull(tBase);
+
+ try {
+ return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
+ } catch (TException e) {
+ throw new CodingException("Failed to serialize: " + tBase, e);
+ }
+ }
+
+ /**
+ * Thrown when serialization or deserialization failed.
+ */
+ public static class CodingException extends Exception {
+ public CodingException(String message) {
+ super(message);
+ }
+ public CodingException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
new file mode 100644
index 0000000..aa77887
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/Driver.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.StateMachine;
+
+import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
+
+/**
+ * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
+ * <pre>
+ * (run -> kill*)? -> stop*
+ * </pre>
+ *
+ * Also ensures the driver is only asked for when needed.
+ */
+public interface Driver {
+
+ /**
+ * Launches a task.
+ *
+ * @param offerId ID of the resource offer to accept with the task.
+ * @param task Task to launch.
+ */
+ void launchTask(OfferID offerId, TaskInfo task);
+
+ /**
+ * Declines a resource offer.
+ *
+ * @param offerId ID of the offer to decline.
+ */
+ void declineOffer(OfferID offerId);
+
+ /**
+ * Sends a kill task request for the given {@code taskId} to the mesos master.
+ *
+ * @param taskId The id of the task to kill.
+ */
+ void killTask(String taskId);
+
+ /**
+ * Stops the underlying driver if it is running, otherwise does nothing.
+ */
+ void stop();
+
+ /**
+ * Starts the underlying driver. Can only be called once.
+ *
+ * @return The status of the underlying driver run request.
+ */
+ Protos.Status start();
+
+ /**
+ * Blocks until the underlying driver is stopped or aborted.
+ *
+ * @return The status of the underlying driver upon exit.
+ */
+ Protos.Status join();
+
+ /**
+ * Mesos driver implementation.
+ */
+ static class DriverImpl implements Driver {
+ private static final Logger LOG = Logger.getLogger(Driver.class.getName());
+
+ /**
+ * Driver states.
+ */
+ enum State {
+ INIT,
+ RUNNING,
+ STOPPED
+ }
+
+ private final StateMachine<State> stateMachine;
+ private final Supplier<Optional<SchedulerDriver>> driverSupplier;
+ private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
+
+ /**
+ * Creates a driver manager that will only ask for the underlying mesos driver when actually
+ * needed.
+ *
+ * @param driverSupplier A factory for the underlying driver.
+ */
+ @Inject
+ DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
+ this.driverSupplier = driverSupplier;
+ this.stateMachine =
+ StateMachine.<State>builder("scheduler_driver")
+ .initialState(State.INIT)
+ .addState(State.INIT, State.RUNNING, State.STOPPED)
+ .addState(State.RUNNING, State.STOPPED)
+ .logTransitions()
+ .throwOnBadTransition(true)
+ .build();
+ }
+
+ private synchronized SchedulerDriver get(State expected) {
+ // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
+ stateMachine.checkState(expected);
+ // This will and should fail if the driver is not present.
+ return driverSupplier.get().get();
+ }
+
+ @Override
+ public void launchTask(OfferID offerId, TaskInfo task) {
+ get(State.RUNNING).launchTasks(offerId, ImmutableList.of(task));
+ }
+
+ @Override
+ public void declineOffer(OfferID offerId) {
+ get(State.RUNNING).declineOffer(offerId);
+ }
+
+ @Override
+ public Protos.Status start() {
+ SchedulerDriver driver = get(State.INIT);
+ stateMachine.transition(State.RUNNING);
+ return driver.start();
+ }
+
+ @Override
+ public Status join() {
+ return get(State.RUNNING).join();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (stateMachine.getState() == State.RUNNING) {
+ SchedulerDriver driver = get(State.RUNNING);
+ driver.stop(true /* failover */);
+ stateMachine.transition(State.STOPPED);
+ }
+ }
+
+ @Override
+ public void killTask(String taskId) {
+ SchedulerDriver driver = get(State.RUNNING);
+ Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
+
+ if (status != DRIVER_RUNNING) {
+ LOG.severe(String.format("Attempt to kill task %s failed with code %s",
+ taskId, status));
+ killFailures.incrementAndGet();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
new file mode 100644
index 0000000..e39bb09
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.protobuf.ByteString;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.Credential;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Factory to create scheduler driver instances.
+ */
+public interface DriverFactory extends Function<String, SchedulerDriver> {
+
+ static class DriverFactoryImpl implements DriverFactory {
+ private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
+
+ @NotNull
+ @CmdLine(name = "mesos_master_address",
+ help = "Address for the mesos master, can be a socket address or zookeeper path.")
+ private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
+
+ @VisibleForTesting
+ static final String PRINCIPAL_KEY = "aurora_authentication_principal";
+ @VisibleForTesting
+ static final String SECRET_KEY = "aurora_authentication_secret";
+ @CmdLine(name = "framework_authentication_file",
+ help = "Properties file which contains framework credentials to authenticate with Mesos"
+ + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
+ + "'" + SECRET_KEY + "'.")
+ private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
+
+ @CmdLine(name = "framework_failover_timeout",
+ help = "Time after which a framework is considered deleted. SHOULD BE VERY HIGH.")
+ private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
+ Arg.create(Amount.of(21L, Time.DAYS));
+
+ /**
+ * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
+ * attempt to write checkpoints when required by a task's framework. These checkpoints allow
+ * executors to be reattached rather than killed when a slave is restarted.
+ *
+ * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
+ * enabled.
+ *
+ * Behavior is as follows:
+ * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=true):
+ * Tasks will launch. Checkpoints will be written.
+ * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=false):
+ * Tasks WILL NOT launch.
+ * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=true):
+ * Tasks will launch. Checkpoints will not be written.
+ * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=false):
+ * Tasks will launch. Checkpoints will not be written.
+ *
+ * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
+ * is resolved.
+ */
+ @CmdLine(name = "require_slave_checkpoint",
+ help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
+ + "slave restart should not kill executors, but the scheduler will not be able to "
+ + "launch tasks on slaves without --checkpoint=true in their command lines. See "
+ + "DriverFactory.java for more information.")
+ private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
+
+ private static final String EXECUTOR_USER = "root";
+
+ private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
+
+ private final Provider<Scheduler> scheduler;
+
+ @Inject
+ DriverFactoryImpl(Provider<Scheduler> scheduler) {
+ this.scheduler = Preconditions.checkNotNull(scheduler);
+ }
+
+ @VisibleForTesting
+ static Properties parseCredentials(InputStream credentialsStream) {
+ Properties properties = new Properties();
+ try {
+ properties.load(credentialsStream);
+ } catch (IOException e) {
+ LOG.severe("Unable to load authentication file");
+ throw Throwables.propagate(e);
+ }
+ Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
+ "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
+ Preconditions.checkState(properties.containsKey(SECRET_KEY),
+ "The framework authentication file is missing the key: %s", SECRET_KEY);
+ return properties;
+ }
+
+ @Override
+ public SchedulerDriver apply(@Nullable String frameworkId) {
+ LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
+
+ FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
+ .setUser(EXECUTOR_USER)
+ .setName(TWITTER_FRAMEWORK_NAME)
+ .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
+ .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
+
+ if (frameworkId != null) {
+ LOG.info("Found persisted framework ID: " + frameworkId);
+ frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
+ } else {
+ LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
+ }
+
+ if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
+ Properties properties;
+ try {
+ properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
+ } catch (FileNotFoundException e) {
+ LOG.severe("Authentication File not Found");
+ throw Throwables.propagate(e);
+ }
+
+ LOG.info(String.format("Connecting to master using authentication (principal: %s).",
+ properties.get(PRINCIPAL_KEY)));
+
+ Credential credential = Credential.newBuilder()
+ .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
+ .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
+ .build();
+
+ return new MesosSchedulerDriver(
+ scheduler.get(),
+ frameworkInfo.build(),
+ MESOS_MASTER_ADDRESS.get(),
+ credential);
+ } else {
+ LOG.warning("Connecting to master without authentication!");
+ return new MesosSchedulerDriver(
+ scheduler.get(),
+ frameworkInfo.build(),
+ MESOS_MASTER_ADDRESS.get());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
new file mode 100644
index 0000000..fb41405
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.GuiceUtils.AllowUnchecked;
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.gen.comm.SchedulerMessage;
+import com.twitter.aurora.scheduler.base.Conversions;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
+import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Location for communication with mesos.
+ */
+class MesosSchedulerImpl implements Scheduler {
+ private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
+
+ private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
+ private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
+ private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
+ private final AtomicLong frameworkDisconnects =
+ Stats.exportLong("scheduler_framework_disconnects");
+ private final AtomicLong frameworkReregisters =
+ Stats.exportLong("scheduler_framework_reregisters");
+ private final AtomicLong lostExecutors = Stats.exportLong("scheduler_lost_executors");
+
+ private final List<TaskLauncher> taskLaunchers;
+
+ private final Storage storage;
+ private final SchedulerCore schedulerCore;
+ private final Lifecycle lifecycle;
+ private volatile boolean registered = false;
+
+ /**
+ * Creates a new scheduler.
+ *
+ * @param schedulerCore Core scheduler.
+ * @param lifecycle Application lifecycle manager.
+ * @param taskLaunchers Task launchers.
+ */
+ @Inject
+ public MesosSchedulerImpl(
+ Storage storage,
+ SchedulerCore schedulerCore,
+ final Lifecycle lifecycle,
+ List<TaskLauncher> taskLaunchers) {
+
+ this.storage = checkNotNull(storage);
+ this.schedulerCore = checkNotNull(schedulerCore);
+ this.lifecycle = checkNotNull(lifecycle);
+ this.taskLaunchers = checkNotNull(taskLaunchers);
+ }
+
+ @Override
+ public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
+ LOG.info("Received notification of lost slave: " + slaveId);
+ }
+
+ @SendNotification(after = Event.DriverRegistered)
+ @Override
+ public void registered(
+ SchedulerDriver driver,
+ final FrameworkID frameworkId,
+ MasterInfo masterInfo) {
+
+ LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
+ }
+ });
+ registered = true;
+ }
+
+ @SendNotification(after = Event.DriverDisconnected)
+ @Override
+ public void disconnected(SchedulerDriver schedulerDriver) {
+ LOG.warning("Framework disconnected.");
+ frameworkDisconnects.incrementAndGet();
+ }
+
+ @Override
+ public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
+ LOG.info("Framework re-registered with master " + masterInfo);
+ frameworkReregisters.incrementAndGet();
+ }
+
+ private static boolean fitsInOffer(TaskInfo task, Offer offer) {
+ return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
+ }
+
+ @Timed("scheduler_resource_offers")
+ @Override
+ public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
+ Preconditions.checkState(registered, "Must be registered before receiving offers.");
+
+ for (final Offer offer : offers) {
+ log(Level.FINE, "Received offer: %s", offer);
+ resourceOffers.incrementAndGet();
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getAttributeStore().saveHostAttributes(Conversions.getAttributes(offer));
+ }
+ });
+
+ // Ordering of task launchers is important here, since offers are consumed greedily.
+ // TODO(William Farner): Refactor this area of code now that the primary task launcher
+ // is asynchronous.
+ for (TaskLauncher launcher : taskLaunchers) {
+ Optional<TaskInfo> task = Optional.absent();
+ try {
+ task = launcher.createTask(offer);
+ } catch (SchedulerException e) {
+ LOG.log(Level.WARNING, "Failed to schedule offers.", e);
+ failedOffers.incrementAndGet();
+ }
+
+ if (task.isPresent()) {
+ if (fitsInOffer(task.get(), offer)) {
+ driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
+ break;
+ } else {
+ LOG.warning("Insufficient resources to launch task " + task);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
+ LOG.info("Offer rescinded: " + offerId);
+ for (TaskLauncher launcher : taskLaunchers) {
+ launcher.cancelOffer(offerId);
+ }
+ }
+
+ @AllowUnchecked
+ @Timed("scheduler_status_update")
+ @Override
+ public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
+ String info = status.hasData() ? status.getData().toStringUtf8() : null;
+ String infoMsg = info != null ? " with info " + info : "";
+ String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
+ LOG.info("Received status update for task " + status.getTaskId().getValue()
+ + " in state " + status.getState() + infoMsg + coreMsg);
+
+ try {
+ for (TaskLauncher launcher : taskLaunchers) {
+ if (launcher.statusUpdate(status)) {
+ return;
+ }
+ }
+ } catch (SchedulerException e) {
+ LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
+ // We re-throw the exception here in an effort to NACK the status update and trigger an
+ // abort of the driver. Previously we directly issued driver.abort(), but the re-entrancy
+ // guarantees of that are uncertain (and we believe it was not working). However, this
+ // was difficult to discern since logging is unreliable during JVM shutdown and we would not
+ // see the above log message.
+ throw e;
+ }
+
+ LOG.warning("Unhandled status update " + status);
+ failedStatusUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void error(SchedulerDriver driver, String message) {
+ LOG.severe("Received error message: " + message);
+ lifecycle.shutdown();
+ }
+
+ @Override
+ public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
+ int status) {
+
+ LOG.info("Lost executor " + executorID);
+ lostExecutors.incrementAndGet();
+ }
+
+ @Timed("scheduler_framework_message")
+ @Override
+ public void frameworkMessage(SchedulerDriver driver, ExecutorID executor, SlaveID slave,
+ byte[] data) {
+
+ if (data == null) {
+ LOG.info("Received empty framework message.");
+ return;
+ }
+
+ try {
+ SchedulerMessage schedulerMsg = ThriftBinaryCodec.decode(SchedulerMessage.class, data);
+ if (schedulerMsg == null || !schedulerMsg.isSet()) {
+ LOG.warning("Received empty scheduler message.");
+ return;
+ }
+
+ switch (schedulerMsg.getSetField()) {
+ case DELETED_TASKS:
+ // TODO(William Farner): Refactor this to use a thinner interface here. As it stands
+ // it is odd that we route the registered() call to schedulerCore via the
+ // registeredListener and call the schedulerCore directly here.
+ schedulerCore.tasksDeleted(schedulerMsg.getDeletedTasks().getTaskIds());
+ break;
+
+ default:
+ LOG.warning("Received unhandled scheduler message type: " + schedulerMsg.getSetField());
+ break;
+ }
+ } catch (ThriftBinaryCodec.CodingException e) {
+ LOG.log(Level.SEVERE, "Failed to decode framework message.", e);
+ }
+ }
+
+ private static void log(Level level, String message, Object... args) {
+ if (LOG.isLoggable(level)) {
+ LOG.log(level, String.format(message, args));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
new file mode 100644
index 0000000..5f73f71
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+
+import com.twitter.aurora.Protobufs;
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.scheduler.base.CommandUtil;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.quantity.Data;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * A factory to create mesos task objects.
+ */
+public interface MesosTaskFactory {
+
+ /**
+ * Creates a mesos task object.
+ *
+ * @param task Assigned task to translate into a task object.
+ * @param slaveId Id of the slave the task is being assigned to.
+ * @return A new task.
+ * @throws SchedulerException If the task could not be encoded.
+ */
+ TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
+
+ static class ExecutorConfig {
+ private final String executorPath;
+
+ public ExecutorConfig(String executorPath) {
+ this.executorPath = checkNotBlank(executorPath);
+ }
+
+ String getExecutorPath() {
+ return executorPath;
+ }
+ }
+
+ static class MesosTaskFactoryImpl implements MesosTaskFactory {
+ private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
+ private static final String EXECUTOR_PREFIX = "thermos-";
+
+ /**
+ * Name to associate with task executors.
+ */
+ @VisibleForTesting
+ static final String EXECUTOR_NAME = "aurora.task";
+
+ private final String executorPath;
+
+ @Inject
+ MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
+ this.executorPath = executorConfig.getExecutorPath();
+ }
+
+ @VisibleForTesting
+ static ExecutorID getExecutorId(String taskId) {
+ return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
+ }
+
+ public static String getJobSourceName(IJobKey jobkey) {
+ return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
+ }
+
+ public static String getJobSourceName(ITaskConfig task) {
+ return getJobSourceName(JobKeys.from(task));
+ }
+
+ public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
+ return String.format("%s.%s", getJobSourceName(task), instanceId);
+ }
+
+ @Override
+ public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
+ checkNotNull(task);
+ byte[] taskInBytes;
+ try {
+ taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
+ } catch (ThriftBinaryCodec.CodingException e) {
+ LOG.log(Level.SEVERE, "Unable to serialize task.", e);
+ throw new SchedulerException("Internal error.", e);
+ }
+
+ ITaskConfig config = task.getTask();
+ List<Resource> resources;
+ if (task.isSetAssignedPorts()) {
+ resources = Resources.from(config)
+ .toResourceList(ImmutableSet.copyOf(task.getAssignedPorts().values()));
+ } else {
+ resources = ImmutableList.of();
+ }
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Setting task resources to "
+ + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
+ }
+ TaskInfo.Builder taskBuilder =
+ TaskInfo.newBuilder()
+ .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
+ .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
+ .setSlaveId(slaveId)
+ .addAllResources(resources)
+ .setData(ByteString.copyFrom(taskInBytes));
+
+ ExecutorInfo executor = ExecutorInfo.newBuilder()
+ .setCommand(CommandUtil.create(executorPath))
+ .setExecutorId(getExecutorId(task.getTaskId()))
+ .setName(EXECUTOR_NAME)
+ .setSource(getInstanceSourceName(config, task.getInstanceId()))
+ .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS))
+ .addResources(
+ Resources.makeMesosResource(Resources.RAM_MB, ResourceSlot.EXECUTOR_RAM.as(Data.MB)))
+ .build();
+ return taskBuilder
+ .setExecutor(executor)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
new file mode 100644
index 0000000..a9c14e6
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * Resource containing class that is aware of executor overhead.
+ */
+public final class ResourceSlot {
+
+ private final Resources resources;
+
+ /**
+ * CPU allocated for each executor.
+ */
+ @VisibleForTesting
+ static final double EXECUTOR_CPUS = 0.25;
+
+ /**
+ * RAM required for the executor. Executors in the wild have been observed using 48-54MB RSS,
+ * setting to 128MB to be extra vigilant initially.
+ */
+ @VisibleForTesting
+ static final Amount<Long, Data> EXECUTOR_RAM = Amount.of(128L, Data.MB);
+
+ private ResourceSlot(Resources r) {
+ this.resources = r;
+ }
+
+ public static ResourceSlot from(ITaskConfig task) {
+ double totalCPU = task.getNumCpus() + EXECUTOR_CPUS;
+ Amount<Long, Data> totalRAM = Amount.of(task.getRamMb() + EXECUTOR_RAM.as(Data.MB), Data.MB);
+ Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
+ return new ResourceSlot(
+ new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
+ }
+
+ public static ResourceSlot from(Offer offer) {
+ return new ResourceSlot(Resources.from(offer));
+ }
+
+ public double getNumCpus() {
+ return resources.getNumCpus();
+ }
+
+ public Amount<Long, Data> getRam() {
+ return resources.getRam();
+ }
+
+ public Amount<Long, Data> getDisk() {
+ return resources.getDisk();
+ }
+
+ public int getNumPorts() {
+ return resources.getNumPorts();
+ }
+
+ @VisibleForTesting
+ public static ResourceSlot from(double cpu,
+ Amount<Long, Data> ram,
+ Amount<Long, Data> disk,
+ int ports) {
+ double totalCPU = cpu + EXECUTOR_CPUS;
+ Amount<Long, Data> totalRAM = Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.as(Data.MB), Data.MB);
+
+ return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
+ }
+
+ public static ResourceSlot sum(ResourceSlot... rs) {
+ return sum(Arrays.asList(rs));
+ }
+
+ public static ResourceSlot sum(Iterable<ResourceSlot> rs) {
+ Resources r = Resources.sum(Iterables.transform(rs, new Function<ResourceSlot, Resources>() {
+ @Override public Resources apply(ResourceSlot input) {
+ return input.resources;
+ }
+ }));
+
+ return new ResourceSlot(r);
+ }
+
+ public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
+ @Override public int compare(ResourceSlot left, ResourceSlot right) {
+ return Resources.RESOURCE_ORDER.compare(left.resources, right.resources);
+ }
+ };
+}