You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:24 UTC
[34/51] [partial] aurora git commit: Move packages from
com.twitter.common to org.apache.aurora.common
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
deleted file mode 100644
index cfea299..0000000
--- a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.concurrent;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}.
- *
- * @author John Sirois
- */
-public class ExecutorServiceShutdown implements Command {
- private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName());
-
- private final ExecutorService executor;
- private final Amount<Long, Time> gracePeriod;
-
- /**
- * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the
- * given {@code executor} when executed. If the supplied grace period is less than or equal to
- * zero the executor service will be asked to shut down but no waiting will be done after these
- * requests.
- *
- * @param executor The executor service this command should shut down when executed.
- * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the
- * next shutdown phase.
- */
- public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) {
- this.executor = Preconditions.checkNotNull(executor);
- this.gracePeriod = Preconditions.checkNotNull(gracePeriod);
- }
-
- @Override
- public void execute() {
- executor.shutdown(); // Disable new tasks from being submitted.
- try {
- // Wait a while for existing tasks to terminate.
- if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
- executor.shutdownNow(); // Cancel currently executing tasks.
- // Wait a while for tasks to respond to being cancelled.
- if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
- LOG.warning("Pool did not terminate");
- }
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted.
- executor.shutdownNow();
- // Preserve interrupt status.
- Thread.currentThread().interrupt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
deleted file mode 100644
index 91a403e..0000000
--- a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.concurrent;
-
-import com.google.common.base.Preconditions;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An executor service that forwards all calls to another executor service. Subclasses should
- * override one or more methods to modify the behavior of the backing executor service as desired
- * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
- *
- * @author John Sirois
- */
-public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService {
- protected final T delegate;
-
- public ForwardingExecutorService(T delegate) {
- Preconditions.checkNotNull(delegate);
- this.delegate = delegate;
- }
-
- public void shutdown() {
- delegate.shutdown();
- }
-
- public List<Runnable> shutdownNow() {
- return delegate.shutdownNow();
- }
-
- public boolean isShutdown() {
- return delegate.isShutdown();
- }
-
- public boolean isTerminated() {
- return delegate.isTerminated();
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return delegate.awaitTermination(timeout, unit);
- }
-
- public <T> Future<T> submit(Callable<T> task) {
- return delegate.submit(task);
- }
-
- public <T> Future<T> submit(Runnable task, T result) {
- return delegate.submit(task, result);
- }
-
- public Future<?> submit(Runnable task) {
- return delegate.submit(task);
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
-
- return delegate.invokeAll(tasks);
- }
-
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
- TimeUnit unit) throws InterruptedException {
-
- return delegate.invokeAll(tasks, timeout, unit);
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
-
- return delegate.invokeAny(tasks);
- }
-
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
-
- return delegate.invokeAny(tasks, timeout, unit);
- }
-
- public void execute(Runnable command) {
- delegate.execute(command);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
deleted file mode 100644
index 70a4a13..0000000
--- a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.concurrent;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-
-/**
- * Utility class that provides factory functions to decorate
- * {@link java.util.concurrent.ExecutorService}s.
- */
-public final class MoreExecutors {
- private MoreExecutors() {
- // utility
- }
-
- /**
- * Returns a {@link ExecutorService} that passes uncaught exceptions to
- * {@link java.lang.Thread.UncaughtExceptionHandler}.
- * <p>
- * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
- * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
- * unchecked exceptions thrown from submitted work. Some users are surprised to find that
- * even the default uncaught exception handler is not invoked.
- *
- * @param executorService delegate
- * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
- * from executor tasks.
- * @return a decorated executor service
- */
- public static ExecutorService exceptionHandlingExecutor(
- ExecutorService executorService,
- Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
-
- Preconditions.checkNotNull(uncaughtExceptionHandler);
- return new ExceptionHandlingExecutorService(
- executorService, Suppliers.ofInstance(uncaughtExceptionHandler));
- }
-
- /**
- * Returns a {@link ExecutorService} that passes uncaught exceptions to
- * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
- * at the time the exception is thrown.
- *
- * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService,
- * Thread.UncaughtExceptionHandler)
- * @param executorService delegate
- * @return a decorated executor service
- */
- public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) {
- return new ExceptionHandlingExecutorService(
- executorService,
- new Supplier<Thread.UncaughtExceptionHandler>() {
- @Override
- public Thread.UncaughtExceptionHandler get() {
- return Thread.currentThread().getUncaughtExceptionHandler();
- }
- });
- }
-
- /**
- * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
- * {@link java.lang.Thread.UncaughtExceptionHandler}.
- * <p>
- * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
- * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
- * unchecked exceptions thrown from submitted work. Some users are surprised to find that
- * even the default uncaught exception handler is not invoked.
- *
- * @param executorService delegate
- * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
- * from executor tasks.
- * @return a decorated executor service
- */
- public static ScheduledExecutorService exceptionHandlingExecutor(
- ScheduledExecutorService executorService,
- Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
-
- Preconditions.checkNotNull(uncaughtExceptionHandler);
- return new ExceptionHandlingScheduledExecutorService(
- executorService,
- Suppliers.ofInstance(uncaughtExceptionHandler));
- }
-
- /**
- * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
- * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
- * at the time the exception is thrown.
- *
- * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService,
- * Thread.UncaughtExceptionHandler)
- * @param executorService delegate
- * @return a decorated executor service
- */
- public static ScheduledExecutorService exceptionHandlingExecutor(
- ScheduledExecutorService executorService) {
-
- return new ExceptionHandlingScheduledExecutorService(
- executorService,
- new Supplier<Thread.UncaughtExceptionHandler>() {
- @Override
- public Thread.UncaughtExceptionHandler get() {
- return Thread.currentThread().getUncaughtExceptionHandler();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
deleted file mode 100644
index cca7001..0000000
--- a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.concurrent;
-
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.FutureTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A future task that supports retries by resubmitting itself to an {@link ExecutorService}.
- *
- * @author William Farner
- */
-public class RetryingFutureTask extends FutureTask<Boolean> {
- private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName());
-
- protected final ExecutorService executor;
- protected final int maxRetries;
- protected int numRetries = 0;
- protected final Callable<Boolean> callable;
-
- /**
- * Creates a new retrying future task that will execute a unit of work until successfully
- * completed, or the retry limit has been reached.
- *
- * @param executor The executor service to resubmit the task to upon failure.
- * @param callable The unit of work. The work is considered successful when {@code true} is
- * returned. It may return {@code false} or throw an exception when unsueccessful.
- * @param maxRetries The maximum number of times to retry the task.
- */
- public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) {
- super(callable);
- this.callable = Preconditions.checkNotNull(callable);
- this.executor = Preconditions.checkNotNull(executor);
- this.maxRetries = maxRetries;
- }
-
- /**
- * Invokes a retry of this task.
- */
- protected void retry() {
- executor.execute(this);
- }
-
- @Override
- public void run() {
- boolean success = false;
- try {
- success = callable.call();
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Exception while executing task.", e);
- }
-
- if (!success) {
- numRetries++;
- if (numRetries > maxRetries) {
- LOG.severe("Task did not complete after " + maxRetries + " retries, giving up.");
- } else {
- LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")");
- retry();
- }
- } else {
- set(true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
deleted file mode 100644
index 5535662..0000000
--- a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.concurrent;
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Collections2;
-
-final class TaskConverter {
- private TaskConverter() {
- // utility
- }
-
- /**
- * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the
- * original Runnable to {@link Thread.UncaughtExceptionHandler}.
- *
- * @param runnable runnable to be wrapped
- * @param handler exception handler that will receive exceptions generated in the runnable
- * @return wrapped runnable
- */
- static Runnable alertingRunnable(
- final Runnable runnable,
- final Supplier<Thread.UncaughtExceptionHandler> handler) {
-
- return new Runnable() {
- @Override
- public void run() {
- try {
- runnable.run();
- } catch (Throwable t) {
- handler.get().uncaughtException(Thread.currentThread(), t);
- throw Throwables.propagate(t);
- }
- }
- };
- }
-
- /**
- * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions
- * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}.
- *
- * @param callable callable to be wrapped
- * @param handler exception handler that will receive exceptions generated in the callable
- * @return wrapped callable
- */
- static <V> Callable<V> alertingCallable(
- final Callable<V> callable,
- final Supplier<Thread.UncaughtExceptionHandler> handler) {
-
- return new Callable<V>() {
- @Override
- public V call() throws Exception {
- try {
- return callable.call();
- } catch (Throwable t) {
- handler.get().uncaughtException(Thread.currentThread(), t);
- throw Throwables.propagate(t);
- }
- }
- };
- }
-
- /*
- * Calls #alertingCallable on a collection of callables
- */
- static <V> Collection<? extends Callable<V>> alertingCallables(
- Collection<? extends Callable<V>> callables,
- final Supplier<Thread.UncaughtExceptionHandler> handler) {
-
- return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() {
- @Override
- public Callable<V> apply(Callable<V> callable) {
- return alertingCallable(callable, handler);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
deleted file mode 100644
index 2cc1692..0000000
--- a/commons/src/main/java/com/twitter/common/util/logging/ResourceLoggingConfigurator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.logging;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.logging.LogManager;
-
-/**
- * A custom java.util.logging configuration class that loads the logging configuration from a
- * properties file resource (as opposed to a file as natively supported by LogManager via
- * java.util.logging.config.file). By default this configurator will look for the resource at
- * /logging.properties but the resource path can be overridden by setting the system property with
- * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this
- * configurator you must specify the following system property:
- * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator
- *
- * @author John Sirois
- */
-public class ResourceLoggingConfigurator {
-
- /**
- * A system property that controls where ResourceLoggingConfigurator looks for the logging
- * configuration on the process classpath.
- */
- public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource";
-
- public ResourceLoggingConfigurator() throws IOException {
- String loggingPropertiesResourcePath =
- System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties");
- InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath);
- Preconditions.checkNotNull(loggingConfig,
- "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath);
- LogManager.getLogManager().readConfiguration(loggingConfig);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
deleted file mode 100644
index aa57572..0000000
--- a/commons/src/main/java/com/twitter/common/util/logging/UnresettableLogManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.logging;
-
-import java.util.logging.LogManager;
-
-/**
- * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing
- * log statements that occur during vm shutdown. The standard LogManager installs a
- * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass
- * nullifies that shutdown hook by disabling any reset of the LogManager by default.
- *
- * @author John Sirois
- */
-public class UnresettableLogManager extends LogManager {
-
- /**
- * The system property that controls which LogManager the java.util.logging subsystem should load.
- */
- public static final String LOGGING_MANAGER = "java.util.logging.manager";
-
- /**
- * A system property which can be used to control an {@code UnresettableLogManager}'s behavior.
- * If the UnresettableLogManager is installed, but an application still wants
- * {@link LogManager#reset()} behavior, they can set this property to "false".
- */
- private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset";
-
- @Override
- public void reset() throws SecurityException {
- if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) {
- System.err.println("UnresettableLogManager is ignoring a reset() request.");
- } else {
- super.reset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
deleted file mode 100644
index 52c80ae..0000000
--- a/commons/src/main/java/com/twitter/common/util/templating/StringTemplateHelper.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.templating;
-
-import java.io.IOException;
-import java.io.Writer;
-
-import com.google.common.base.Preconditions;
-
-import org.antlr.stringtemplate.AutoIndentWriter;
-import org.antlr.stringtemplate.StringTemplate;
-import org.antlr.stringtemplate.StringTemplateGroup;
-
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.MorePreconditions;
-
-/**
- * A class to simplify the operations required to load a stringtemplate template file from the
- * classpath and populate it.
- */
-public class StringTemplateHelper {
-
- private final StringTemplateGroup group;
- private final String templatePath;
-
- /**
- * Creates a new template helper.
- *
- * @param templateContextClass Classpath context for the location of the template file.
- * @param templateName Template file name (excluding .st suffix) relative to
- * {@code templateContextClass}.
- * @param cacheTemplates Whether the template should be cached.
- */
- public StringTemplateHelper(
- Class<?> templateContextClass,
- String templateName,
- boolean cacheTemplates) {
-
- MorePreconditions.checkNotBlank(templateName);
- String templatePath =
- templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName;
- StringTemplateGroup group = new StringTemplateGroup(templateName);
- Preconditions.checkNotNull(group.getInstanceOf(templatePath),
- "Failed to load template at: %s", templatePath);
-
- this.group = group;
- if (!cacheTemplates) {
- group.setRefreshInterval(0);
- }
- this.templatePath = templatePath;
- }
-
- /**
- * Thrown when an exception is encountered while populating a template.
- */
- public static class TemplateException extends Exception {
- public TemplateException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-
- /**
- * Writes the populated template to an output writer by providing a closure with access to
- * the unpopulated template object.
- *
- * @param out Template output writer.
- * @param parameterSetter Closure to populate the template.
- * @throws TemplateException If an exception was encountered while populating the template.
- */
- public void writeTemplate(
- Writer out,
- Closure<StringTemplate> parameterSetter) throws TemplateException {
-
- Preconditions.checkNotNull(out);
- Preconditions.checkNotNull(parameterSetter);
-
- StringTemplate stringTemplate = group.getInstanceOf(templatePath);
- try {
- parameterSetter.execute(stringTemplate);
- stringTemplate.write(new AutoIndentWriter(out));
- } catch (IOException e) {
- throw new TemplateException("Failed to write template: " + e, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java b/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
deleted file mode 100644
index 34d3bc9..0000000
--- a/commons/src/main/java/com/twitter/common/util/testing/FakeClock.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.testing;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A clock for use in testing with a configurable value for {@link #nowMillis()}.
- *
- * @author John Sirois
- */
-public class FakeClock implements Clock {
- // Tests may need to use the clock from multiple threads, ensure liveness.
- private volatile long nowNanos;
-
- /**
- * Sets what {@link #nowMillis()} will return until this method is called again with a new value
- * for {@code now}.
- *
- * @param nowMillis the current time in milliseconds
- */
- public void setNowMillis(long nowMillis) {
- Preconditions.checkArgument(nowMillis >= 0);
- this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis);
- }
-
- /**
- * Advances the current time by {@code millis} milliseconds. Time can be retarded by passing a
- * negative value.
- *
- * @param period the amount of time to advance the current time by
- */
- public void advance(Amount<Long, Time> period) {
- Preconditions.checkNotNull(period);
- long newNanos = nowNanos + period.as(Time.NANOSECONDS);
- Preconditions.checkArgument(newNanos >= 0,
- "invalid period %s - would move current time to a negative value: %sns", period, newNanos);
- nowNanos = newNanos;
- }
-
- @Override
- public long nowMillis() {
- return TimeUnit.NANOSECONDS.toMillis(nowNanos);
- }
-
- @Override
- public long nowNanos() {
- return nowNanos;
- }
-
- /**
- * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis}
- * after this method completes will consistently reveal that {@code millis} did in fact pass while
- * waiting.
- *
- * @param millis the amount of time to wait in milliseconds
- */
- @Override
- public void waitFor(long millis) {
- advance(Amount.of(millis, Time.MILLISECONDS));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java b/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
deleted file mode 100644
index dfe374e..0000000
--- a/commons/src/main/java/com/twitter/common/util/testing/FakeTicker.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util.testing;
-
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-
-import org.omg.CORBA.PUBLIC_MEMBER;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * A ticker for use in testing with a configurable value for {@link #Ticker#read()}.
- */
-public class FakeTicker extends Ticker{
- private long nowNanos;
-
- /**
- * Sets what {@link #read()} will return until this method is called again with a new value
- * for {@code now}.
- *
- * @param nowNanos the current time in nanoseconds
- */
- public void setNowNanos(long nowNanos) {
- this.nowNanos = nowNanos;
- }
-
- @Override
- public long read(){
- return nowNanos;
- }
-
- /**
- * Advances the current time by the given {@code period}. Time can be retarded by passing a
- * negative value.
- *
- * @param period the amount of time to advance the current time by
- */
- public void advance(Amount<Long, Time> period) {
- Preconditions.checkNotNull(period);
- nowNanos = nowNanos + period.as(Time.NANOSECONDS);
- }
-
- /**
- * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()}
- * after this method completes will consistently reveal that {@code nanos} did in fact pass while
- * waiting.
- *
- * @param nanos the amount of time to wait in nanoseconds
- */
- public void waitNanos(long nanos) {
- advance(Amount.of(nanos, Time.NANOSECONDS));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java b/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
deleted file mode 100644
index 1ee0e40..0000000
--- a/commons/src/main/java/com/twitter/common/webassets/bootstrap/BootstrapModule.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.webassets.bootstrap;
-
-import com.google.common.io.Resources;
-import com.google.common.net.MediaType;
-import com.google.inject.AbstractModule;
-
-import com.twitter.common.application.http.Registration;
-
-/**
- * A binding module to register bootstrap HTTP assets.
- */
-public final class BootstrapModule extends AbstractModule {
- /**
- * Enum for available Bootstrap versions to choose from.
- */
- public enum BootstrapVersion {
- VERSION_2_1_1 ("2.1.1"),
- VERSION_2_3_2 ("2.3.2");
-
- private final String version;
-
- BootstrapVersion(String s) {
- version = s;
- }
- }
-
- private final String version;
-
- /**
- * Default constructor.
- */
- public BootstrapModule() {
- this(BootstrapVersion.VERSION_2_1_1);
- }
-
- /**
- * BootstrapModule Constructor.
- *
- * @param version supplies the bootstrap version to select.
- */
- public BootstrapModule(BootstrapVersion version) {
- this.version = version.version;
- }
-
- private void register(String mountPath, String resourcePath, String contentType) {
- Registration.registerHttpAsset(
- binder(),
- "/" + mountPath,
- Resources.getResource(BootstrapModule.class, resourcePath),
- contentType,
- true);
- }
-
- @Override
- protected void configure() {
- register(
- "css/bootstrap-responsive.min.css",
- version + "/css/bootstrap-responsive.min.css",
- MediaType.CSS_UTF_8.toString());
- register(
- "css/bootstrap.min.css",
- version + "/css/bootstrap.min.css",
- MediaType.CSS_UTF_8.toString());
- register(
- "img/glyphicons-halflings-white.png",
- version + "/img/glyphicons-halflings-white.png",
- MediaType.PNG.toString());
- register(
- "img/glyphicons-halflings.png",
- version + "/img/glyphicons-halflings.png",
- MediaType.PNG.toString());
- register(
- "js/bootstrap.min.js",
- version + "/js/bootstrap.min.js",
- MediaType.JAVASCRIPT_UTF_8.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java b/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
deleted file mode 100644
index 5c5e65d..0000000
--- a/commons/src/main/java/com/twitter/common/webassets/jquery/JQueryModule.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.webassets.jquery;
-
-import com.google.common.io.Resources;
-import com.google.common.net.MediaType;
-import com.google.inject.AbstractModule;
-
-import com.twitter.common.application.http.Registration;
-
-/**
- * A binding module to register jQuery HTTP assets.
- */
-public final class JQueryModule extends AbstractModule {
-
- @Override
- protected void configure() {
- Registration.registerHttpAsset(
- binder(),
- "/js/jquery.min.js",
- Resources.getResource(JQueryModule.class, "js/jquery-1.8.2.min.js"),
- MediaType.JAVASCRIPT_UTF_8.toString(),
- true);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java b/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
deleted file mode 100644
index 3945700..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/Candidate.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-
-import org.apache.zookeeper.KeeperException;
-
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.WatchException;
-import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * Interface definition for becoming or querying for a ZooKeeper-based group leader.
- */
-public interface Candidate {
-
- /**
- * Returns the current group leader by querying ZooKeeper synchronously.
- *
- * @return the current group leader's identifying data or {@link Optional#absent()} if there is
- * no leader
- * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
- * @throws KeeperException if there was a problem reading the leader information
- * @throws InterruptedException if this thread is interrupted getting the leader
- */
- public Optional<byte[]> getLeaderData()
- throws ZooKeeperConnectionException, KeeperException, InterruptedException;
-
- /**
- * Encapsulates a leader that can be elected and subsequently defeated.
- */
- interface Leader {
-
- /**
- * Called when this leader has been elected.
- *
- * @param abdicate a command that can be used to abdicate leadership and force a new election
- */
- void onElected(ExceptionalCommand<JoinException> abdicate);
-
- /**
- * Called when the leader has been ousted. Can occur either if the leader abdicates or if an
- * external event causes the leader to lose its leadership role (session expiration).
- */
- void onDefeated();
- }
-
- /**
- * Offers this candidate in leadership elections for as long as the current jvm process is alive.
- * Upon election, the {@code onElected} callback will be executed and a command that can be used
- * to abdicate leadership will be passed in. If the elected leader jvm process dies or the
- * elected leader successfully abdicates then a new leader will be elected. Leaders that
- * successfully abdicate are removed from the group and will not be eligible for leadership
- * election unless {@link #offerLeadership(Leader)} is called again.
- *
- * @param leader the leader to notify of election and defeat events
- * @throws JoinException if there was a problem joining the group
- * @throws WatchException if there is a problem generating the 1st group membership list
- * @throws InterruptedException if interrupted waiting to join the group and determine initial
- * election results
- * @return a supplier that can be queried to find out if this leader is currently elected
- */
- public Supplier<Boolean> offerLeadership(Leader leader)
- throws JoinException, WatchException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
deleted file mode 100644
index c77945b..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/CandidateImpl.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.apache.zookeeper.KeeperException;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.zookeeper.Group.GroupChangeListener;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.Membership;
-import com.twitter.common.zookeeper.Group.WatchException;
-import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * Implements leader election for small groups of candidates. This implementation is subject to the
- * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
- * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
- */
-public class CandidateImpl implements Candidate {
- private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName());
-
- private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
-
- private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() {
- @Override public byte[] get() {
- try {
- return InetAddress.getLocalHost().getHostAddress().getBytes();
- } catch (UnknownHostException e) {
- LOG.log(Level.WARNING, "Failed to determine local address!", e);
- return UNKNOWN_CANDIDATE_DATA;
- }
- }
- };
-
- private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
- new Function<Iterable<String>, String>() {
- @Override public String apply(Iterable<String> candidates) {
- return Ordering.natural().min(candidates);
- }
- };
-
- private final Group group;
- private final Function<Iterable<String>, String> judge;
- private final Supplier<byte[]> dataSupplier;
-
- /**
- * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a
- * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or
- * 1st candidate and a default supplier that provides the ip address of this host according to
- * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data.
- */
- public CandidateImpl(Group group) {
- this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER);
- }
-
- /**
- * Creates a candidate that can be used to offer leadership for the given {@code group} using
- * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest
- * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes
- * will become available to all participants via the {@link Candidate#getLeaderData()} method.
- */
- public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) {
- this(group, MOST_RECENT_JUDGE, dataSupplier);
- }
-
- /**
- * Creates a candidate that can be used to offer leadership for the given {@code group}. The
- * {@code judge} is used to pick the current leader from all group members whenever the group
- * membership changes. To form a well-behaved election group with one leader, all candidates
- * should use the same judge. The dataSupplier should produce bytes that identify this process
- * as leader. These bytes will become available to all participants via the
- * {@link Candidate#getLeaderData()} method.
- */
- public CandidateImpl(
- Group group,
- Function<Iterable<String>, String> judge,
- Supplier<byte[]> dataSupplier) {
- this.group = Preconditions.checkNotNull(group);
- this.judge = Preconditions.checkNotNull(judge);
- this.dataSupplier = Preconditions.checkNotNull(dataSupplier);
- }
-
- @Override
- public Optional<byte[]> getLeaderData()
- throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-
- String leaderId = getLeader(group.getMemberIds());
- return leaderId == null
- ? Optional.<byte[]>absent()
- : Optional.of(group.getMemberData(leaderId));
- }
-
- @Override
- public Supplier<Boolean> offerLeadership(final Leader leader)
- throws JoinException, WatchException, InterruptedException {
-
- final Membership membership = group.join(dataSupplier, new Command() {
- @Override public void execute() {
- leader.onDefeated();
- }
- });
-
- final AtomicBoolean elected = new AtomicBoolean(false);
- final AtomicBoolean abdicated = new AtomicBoolean(false);
- group.watch(new GroupChangeListener() {
- @Override public void onGroupChange(Iterable<String> memberIds) {
- boolean noCandidates = Iterables.isEmpty(memberIds);
- String memberId = membership.getMemberId();
-
- if (noCandidates) {
- LOG.warning("All candidates have temporarily left the group: " + group);
- } else if (!Iterables.contains(memberIds, memberId)) {
- LOG.severe(String.format(
- "Current member ID %s is not a candidate for leader, current voting: %s",
- memberId, memberIds));
- } else {
- boolean electedLeader = memberId.equals(getLeader(memberIds));
- boolean previouslyElected = elected.getAndSet(electedLeader);
-
- if (!previouslyElected && electedLeader) {
- LOG.info(String.format("Candidate %s is now leader of group: %s",
- membership.getMemberPath(), memberIds));
-
- leader.onElected(new ExceptionalCommand<JoinException>() {
- @Override public void execute() throws JoinException {
- membership.cancel();
- abdicated.set(true);
- }
- });
- } else if (!electedLeader) {
- if (previouslyElected) {
- leader.onDefeated();
- }
- LOG.info(String.format(
- "Candidate %s waiting for the next leader election, current voting: %s",
- membership.getMemberPath(), memberIds));
- }
- }
- }
- });
-
- return new Supplier<Boolean>() {
- @Override public Boolean get() {
- return !abdicated.get() && elected.get();
- }
- };
- }
-
- @Nullable
- private String getLeader(Iterable<String> memberIds) {
- return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
deleted file mode 100644
index afe3e6f..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/CompoundServerSet.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-/**
- * A ServerSet that delegates all calls to other ServerSets.
- */
-public class CompoundServerSet implements ServerSet {
- private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
-
- private final List<ServerSet> serverSets;
- private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap();
- private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList();
- private Command stopWatching = null;
- private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of();
-
- /**
- * Create new ServerSet from a list of serverSets.
- *
- * @param serverSets serverSets to which the calls will be delegated.
- */
- public CompoundServerSet(Iterable<ServerSet> serverSets) {
- MorePreconditions.checkNotBlank(serverSets);
- this.serverSets = ImmutableList.copyOf(serverSets);
- }
-
- private interface JoinOp {
- EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException;
- }
-
- private interface StatusOp {
- void changeStatus(EndpointStatus status) throws UpdateException;
- }
-
- private void changeStatus(
- ImmutableList<EndpointStatus> statuses,
- StatusOp statusOp) throws UpdateException {
-
- ImmutableList.Builder<String> builder = ImmutableList.builder();
- int errorIdx = 1;
- for (EndpointStatus endpointStatus : statuses) {
- try {
- statusOp.changeStatus(endpointStatus);
- } catch (UpdateException exception) {
- builder.add(String.format("[%d] %s", errorIdx++,
- Throwables.getStackTraceAsString(exception)));
- }
- }
- if (errorIdx > 1) {
- throw new UpdateException(
- "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build()));
- }
- }
-
- private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException {
- // Get the list of endpoint status from the serverSets.
- ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder();
- for (ServerSet serverSet : serverSets) {
- builder.add(joiner.doJoin(serverSet));
- }
-
- final ImmutableList<EndpointStatus> statuses = builder.build();
-
- return new EndpointStatus() {
- @Override public void leave() throws UpdateException {
- changeStatus(statuses, new StatusOp() {
- @Override public void changeStatus(EndpointStatus status) throws UpdateException {
- status.leave();
- }
- });
- }
-
- @Override public void update(final Status newStatus) throws UpdateException {
- changeStatus(statuses, new StatusOp() {
- @Override public void changeStatus(EndpointStatus status) throws UpdateException {
- status.update(newStatus);
- }
- });
- }
- };
- }
-
- @Override
- public EndpointStatus join(
- final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints)
- throws Group.JoinException, InterruptedException {
-
- return doJoin(new JoinOp() {
- @Override public EndpointStatus doJoin(ServerSet serverSet)
- throws JoinException, InterruptedException {
- return serverSet.join(endpoint, additionalEndpoints);
- }
- });
- }
-
- /*
- * If any one of the serverSet throws an exception during respective join, the exception is
- * propagated. Join is successful only if all the joins are successful.
- *
- * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a
- * partially joined state.
- *
- * @see ServerSet#join(InetSocketAddress, Map, Status)
- */
- @Override
- public EndpointStatus join(
- final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints,
- final Status status) throws Group.JoinException, InterruptedException {
-
- return doJoin(new JoinOp() {
- @Override public EndpointStatus doJoin(ServerSet serverSet)
- throws JoinException, InterruptedException {
-
- return serverSet.join(endpoint, additionalEndpoints, status);
- }
- });
- }
-
- @Override
- public EndpointStatus join(
- final InetSocketAddress endpoint,
- final Map<String, InetSocketAddress> additionalEndpoints,
- final int shardId) throws JoinException, InterruptedException {
-
- return doJoin(new JoinOp() {
- @Override public EndpointStatus doJoin(ServerSet serverSet)
- throws JoinException, InterruptedException {
-
- return serverSet.join(endpoint, additionalEndpoints, shardId);
- }
- });
- }
-
- // Handles changes to the union of hosts.
- private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) {
- instanceCache.put(serverSet, hosts);
-
- // Get the union of hosts.
- ImmutableSet<ServiceInstance> currentHosts =
- ImmutableSet.copyOf(Iterables.concat(instanceCache.values()));
-
- // Check if the hosts have changed.
- if (!currentHosts.equals(allHosts)) {
- allHosts = currentHosts;
-
- // Notify the monitors.
- for (HostChangeMonitor<ServiceInstance> monitor : monitors) {
- monitor.onChange(allHosts);
- }
- }
- }
-
- /**
- * Monitor the CompoundServerSet.
- *
- * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the
- * exception is propagated. The call is successful only if all the monitor calls to the
- * underlying serverSets are successful.
- *
- * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not
- * be monitored.
- *
- * @param monitor HostChangeMonitor instance used to monitor host changes.
- * @return A command that, when executed, will stop monitoring all underlying server sets.
- * @throws MonitorException If there was a problem monitoring any of the underlying server sets.
- */
- @Override
- public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor)
- throws MonitorException {
- if (stopWatching == null) {
- monitors.add(monitor);
- ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder();
-
- for (final ServerSet serverSet : serverSets) {
- commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() {
- @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) {
- handleChange(serverSet, hostSet);
- }
- }));
- }
-
- stopWatching = Commands.compound(commandsBuilder.build());
- }
-
- return stopWatching;
- }
-
- @Override
- public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- watch(monitor);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
deleted file mode 100644
index 4ddbb90..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLock.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * DistributedLock
- *
- * @author Florian Leibert
- */
-public interface DistributedLock {
- void lock() throws LockingException;
-
- boolean tryLock(long timeout, TimeUnit unit);
-
- void unlock() throws LockingException;
-
- public static class LockingException extends RuntimeException {
- public LockingException(String msg, Exception e) {
- super(msg, e);
- }
-
- public LockingException(String msg) {
- super(msg);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
deleted file mode 100644
index 2d9ee63..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/DistributedLockImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-
-import com.twitter.common.base.MorePreconditions;
-
-/**
- * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock,
- * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a
- * list of children for the lock node. Due to the nature of sequential, all the ids are increasing
- * in order, therefore the client with the least ID according to natural ordering will hold the
- * lock. Every other client watches the id immediately preceding its own id and checks for the lock
- * in case of notification. The client holding the lock does the work and finally deletes the node,
- * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but
- * avoided in most cases because if a client drops dead while holding the lock, the ZK session
- * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks
- * could occur if the the worker thread on a client hangs but the zk-client thread is still alive.
- * There could be an external monitor client that ensures that alerts are triggered if the least-id
- * ephemeral node is present past a time-out.
- * <p/>
- * Note: Locking attempts will fail in case session expires!
- *
- * @author Florian Leibert
- */
-@ThreadSafe
-public class DistributedLockImpl implements DistributedLock {
-
- private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName());
-
- private final ZooKeeperClient zkClient;
- private final String lockPath;
- private final ImmutableList<ACL> acl;
-
- private final AtomicBoolean aborted = new AtomicBoolean(false);
- private CountDownLatch syncPoint;
- private boolean holdsLock = false;
- private String currentId;
- private String currentNode;
- private String watchedNode;
- private LockWatcher watcher;
-
- /**
- * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default
- * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
- */
- public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) {
- this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- /**
- * Creates a distributed lock using the given {@code zkClient} to coordinate locking.
- *
- * @param zkClient The ZooKeeper client to use.
- * @param lockPath The path used to manage the lock under.
- * @param acl The acl to apply to newly created lock nodes.
- */
- public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) {
- this.zkClient = Preconditions.checkNotNull(zkClient);
- this.lockPath = MorePreconditions.checkNotBlank(lockPath);
- this.acl = ImmutableList.copyOf(acl);
- this.syncPoint = new CountDownLatch(1);
- }
-
- private synchronized void prepare()
- throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException {
-
- ZooKeeperUtils.ensurePath(zkClient, acl, lockPath);
- LOG.log(Level.FINE, "Working with locking path:" + lockPath);
-
- // Create an EPHEMERAL_SEQUENTIAL node.
- currentNode =
- zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- // We only care about our actual id since we want to compare ourselves to siblings.
- if (currentNode.contains("/")) {
- currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1);
- }
- LOG.log(Level.FINE, "Received ID from zk:" + currentId);
- this.watcher = new LockWatcher();
- }
-
- @Override
- public synchronized void lock() throws LockingException {
- if (holdsLock) {
- throw new LockingException("Error, already holding a lock. Call unlock first!");
- }
- try {
- prepare();
- watcher.checkForLock();
- syncPoint.await();
- if (!holdsLock) {
- throw new LockingException("Error, couldn't acquire the lock!");
- }
- } catch (InterruptedException e) {
- cancelAttempt();
- throw new LockingException("InterruptedException while trying to acquire lock!", e);
- } catch (KeeperException e) {
- // No need to clean up since the node wasn't created yet.
- throw new LockingException("KeeperException while trying to acquire lock!", e);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- // No need to clean up since the node wasn't created yet.
- throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
- }
- }
-
- @Override
- public synchronized boolean tryLock(long timeout, TimeUnit unit) {
- if (holdsLock) {
- throw new LockingException("Error, already holding a lock. Call unlock first!");
- }
- try {
- prepare();
- watcher.checkForLock();
- boolean success = syncPoint.await(timeout, unit);
- if (!success) {
- return false;
- }
- if (!holdsLock) {
- throw new LockingException("Error, couldn't acquire the lock!");
- }
- } catch (InterruptedException e) {
- cancelAttempt();
- return false;
- } catch (KeeperException e) {
- // No need to clean up since the node wasn't created yet.
- throw new LockingException("KeeperException while trying to acquire lock!", e);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- // No need to clean up since the node wasn't created yet.
- throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
- }
- return true;
- }
-
- @Override
- public synchronized void unlock() throws LockingException {
- if (currentId == null) {
- throw new LockingException("Error, neither attempting to lock nor holding a lock!");
- }
- Preconditions.checkNotNull(currentId);
- // Try aborting!
- if (!holdsLock) {
- aborted.set(true);
- LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!");
- } else {
- LOG.log(Level.INFO, "Cleaning up this locks ephemeral node.");
- cleanup();
- }
- }
-
- //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token?
-
- private synchronized void cancelAttempt() {
- LOG.log(Level.INFO, "Cancelling lock attempt!");
- cleanup();
- // Bubble up failure...
- holdsLock = false;
- syncPoint.countDown();
- }
-
- private void cleanup() {
- LOG.info("Cleaning up!");
- Preconditions.checkNotNull(currentId);
- try {
- Stat stat = zkClient.get().exists(currentNode, false);
- if (stat != null) {
- zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION);
- } else {
- LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!");
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- holdsLock = false;
- aborted.set(false);
- currentId = null;
- currentNode = null;
- watcher = null;
- syncPoint = new CountDownLatch(1);
- }
-
- class LockWatcher implements Watcher {
-
- public synchronized void checkForLock() {
- MorePreconditions.checkNotBlank(currentId);
-
- try {
- List<String> candidates = zkClient.get().getChildren(lockPath, null);
- ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates);
-
- // Unexpected behavior if there are no children!
- if (sortedMembers.isEmpty()) {
- throw new LockingException("Error, member list is empty!");
- }
-
- int memberIndex = sortedMembers.indexOf(currentId);
-
- // If we hold the lock
- if (memberIndex == 0) {
- holdsLock = true;
- syncPoint.countDown();
- } else {
- final String nextLowestNode = sortedMembers.get(memberIndex - 1);
- LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " +
- "waiting for [%s] to release lock.", currentId, nextLowestNode));
-
- watchedNode = String.format("%s/%s", lockPath, nextLowestNode);
- Stat stat = zkClient.get().exists(watchedNode, this);
- if (stat == null) {
- checkForLock();
- }
- }
- } catch (InterruptedException e) {
- LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
- "got interrupted. Trying to cancel lock acquisition.", currentId), e);
- cancelAttempt();
- } catch (KeeperException e) {
- LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
- "got a KeeperException. Trying to cancel lock acquisition.", currentId), e);
- cancelAttempt();
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
- "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e);
- cancelAttempt();
- }
- }
-
- @Override
- public synchronized void process(WatchedEvent event) {
- // this handles the case where we have aborted a lock and deleted ourselves but still have a
- // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub.
- if (!event.getPath().equals(watchedNode)) {
- LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode);
- return;
- }
- //TODO(Florian Leibert): Pull this into the outer class.
- if (event.getType() == Watcher.Event.EventType.None) {
- switch (event.getState()) {
- case SyncConnected:
- // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort.
- LOG.info("Reconnected...");
- break;
- case Expired:
- LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId));
- cancelAttempt();
- break;
- }
- } else if (event.getType() == Event.EventType.NodeDeleted) {
- checkForLock();
- } else {
- LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name()));
- }
- }
- }
-}