You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:34 UTC
[20/37] aurora git commit: Import of Twitter Commons.
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Stat.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/Stat.java b/commons/src/main/java/com/twitter/common/util/Stat.java
new file mode 100644
index 0000000..1f430eb
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/Stat.java
@@ -0,0 +1,360 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+/** ************************************************************************
+ ** Summize
+ ** This work protected by US Copyright Law and contains proprietary and
+ ** confidential trade secrets.
+ ** (c) Copyright 2007 Summize, ALL RIGHTS RESERVED.
+ ** ************************************************************************/
+package com.twitter.common.util;
+
+//***************************************************************
+//
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.text.NumberFormat;
+
+/**
+ * This class is designed to provide basic statistics collection.
+ * For each instance of this object statistics and be added to it
+ * then the sum, mean, std dev, min and max can be gathered at the
+ * end. To reuse this object, a clear method can be called to reset
+ * the statistics.
+ */
+public class Stat implements Serializable {
+
+ /**
+ * Add a number to the statistics collector.
+ * doubles are used for all collections.
+ *
+ * @param x number added to the statistics.
+ */
+ public void addNumber(int x) {
+ addNumber((double) x);
+ }
+
+ /**
+ * Add a number to the statistics collector.
+ * doubles are used for all collections.
+ *
+ * @param x number added to the statistics.
+ */
+ public void addNumber(float x) {
+ addNumber((double) x);
+ }
+
+ /**
+ * Add a number to the statistics collector.
+ * doubles are used for all collections.
+ *
+ * @param x number added to the statistics.
+ */
+ public synchronized void addNumber(double x) {
+ if (_max < x) {
+ _max = x;
+ }
+ if (_min > x) {
+ _min = x;
+ }
+
+ _sum += x;
+ _sumOfSq += (x * x);
+ _number++;
+
+ return;
+ }
+
+
+ /**
+ * Clear the statistics counters...
+ */
+ public void clear() {
+ _max = 0;
+ _min = Double.MAX_VALUE;
+ _number = 0;
+ _mean = 0;
+ _stdDev = 0;
+ _sum = 0;
+ _sumOfSq = 0;
+ }
+
+
+ /**
+ * Create a string representation of the
+ * statistics collected so far. NOTE this
+ * is formatted and may not suit all needs
+ * and thus the user should just call the
+ * needed methods to get mean, std dev, etc.
+ * and format the data as needed.
+ *
+ * @return String Java string formatted output of results.
+ */
+ public String toString() {
+ return toString(false);
+ }
+
+
+ /**
+ * Create a string representation of the
+ * statistics collected so far. The results
+ * are formatted in percentage format if
+ * passed in true, otherwise the results
+ * are the same as the toString call. NOTE this
+ * is formatted and may not suit all needs
+ * and thus the user should just call the
+ * needed methods to get mean, std dev, etc.
+ * and format the data as needed.
+ *
+ * @param percent Format as percentages if set to true.
+ * @return String Java string formatted output of results.
+ */
+ public String toString(boolean percent) {
+ calculate();
+ NumberFormat nf = NumberFormat.getInstance();
+ nf.setMaximumFractionDigits(4);
+
+ if (_number > 1) {
+ StringBuffer results = new StringBuffer();
+ if (percent) {
+ results.append("Number:" + nf.format(_number * 100) + "%");
+ } else {
+ results.append("Number:" + nf.format(_number));
+ }
+
+ if (percent) {
+ results.append(" Max:" + nf.format(_max * 100) + "%");
+ } else {
+ results.append(" Max:" + nf.format(_max));
+ }
+
+ if (percent) {
+ results.append(" Min:" + nf.format(_min * 100) + "%");
+ } else {
+ results.append(" Min:" + nf.format(_min));
+ }
+
+ if (percent) {
+ results.append(" Mean:" + nf.format(_mean * 100) + "%");
+ } else {
+ results.append(" Mean:" + nf.format(_mean));
+ }
+
+ results.append(" Sum:" + nf.format(_sum));
+ results.append(" STD:" + nf.format(_stdDev));
+ return results.toString();
+ } else if (_number == 1) {
+ if (percent) {
+ return ("Number:" + nf.format(_sum * 100) + "%");
+ } else {
+ return ("Number:" + nf.format(_sum));
+ }
+ } else {
+ return ("Number: N/A");
+ }
+ }
+
+
+ private void calculate() {
+ getMean();
+ getStandardDev();
+ }
+
+
+ /**
+ * Get the max data element added to the statistics
+ * object so far.
+ *
+ * @return double - Maximum entry added so far.
+ */
+ public double getMax() {
+ return _max;
+ }
+
+
+ /**
+ * Get the min data element added to the statistics
+ * object so far.
+ *
+ * @return double - Min entry added so far.
+ */
+ public double getMin() {
+ return _min;
+ }
+
+
+ /**
+ * Get the number of data elements added to the statistics
+ * object so far.
+ *
+ * @return double - Number of entries added so far.
+ */
+ public long getNumberOfElements() {
+ return _number;
+ }
+
+
+ /**
+ * Get the average or mean of data elements added to the
+ * statistics object so far.
+ *
+ * @return double - Mean of entries added so far.
+ */
+ public double getMean() {
+ if (_number > 0) {
+ _mean = _sum / _number;
+ }
+ return _mean;
+ }
+
+ /**
+ * Get the ratio of the sum of elements divided by the number
+ * of elements added * 100
+ *
+ * @return double - Percent of entries added so far.
+ */
+ public double getPercent() {
+ if (_number > 0) {
+ _mean = _sum / _number;
+ }
+ _mean = _mean * 100;
+ return _mean;
+ }
+
+
+ /**
+ * Get the sum or mean of data elements added to the
+ * statistics object so far.
+ *
+ * @return double - Sum of entries added so far.
+ */
+ public double getSum() {
+ return _sum;
+ }
+
+
+ /**
+ * Get the sum of the squares of the data elements added
+ * to the statistics object so far.
+ *
+ * @return double - Sum of the squares of the entries added so far.
+ */
+ public double getSumOfSq() {
+ return _sumOfSq;
+ }
+
+
+ /**
+ * Get the standard deviation of the data elements added
+ * to the statistics object so far.
+ *
+ * @return double - Sum of the standard deviation of the entries added so far.
+ */
+ public double getStandardDev() {
+ if (_number > 1) {
+ _stdDev = Math.sqrt((_sumOfSq - ((_sum * _sum) / _number)) / (_number - 1));
+ }
+ return _stdDev;
+ }
+
+
+ /**
+ * Read the data from the InputStream so it can be used to populate
+ * the current objects state.
+ *
+ * @param in java.io.InputStream to write to.
+ * @throws IOException
+ */
+ public void readFromDataInput(InputStream in) throws IOException {
+ DataInput di = new DataInputStream(in);
+ readFromDataInput(di);
+ return;
+ }
+
+
+ /**
+ * Read the data from the DataInput so it can be used to populate
+ * the current objects state.
+ *
+ * @param in java.io.InputStream to write to.
+ * @throws IOException
+ */
+ public void readFromDataInput(DataInput in) throws IOException {
+ _max = in.readDouble();
+ _min = in.readDouble();
+ _number = in.readLong();
+ _mean = in.readDouble();
+ _stdDev = in.readDouble();
+ _sum = in.readDouble();
+ _sumOfSq = in.readDouble();
+ return;
+ }
+
+
+ /**
+ * Write the data to the output steam so it can be streamed to an
+ * other process, wire or storage median in a format that another Stats
+ * object can read.
+ *
+ * @param out java.io.OutputStream to write to.
+ * @throws IOException
+ */
+ public void writeToDataOutput(OutputStream out) throws IOException {
+ DataOutput dout = new DataOutputStream(out);
+ writeToDataOutput(dout);
+ return;
+
+ }
+
+
+ /**
+ * Write the data to the data output object so it can be written to an
+ * other process, wire or storage median in a format that another Stats
+ * object can read.
+ *
+ * @param out java.io.DataOutput to write to.
+ * @throws IOException
+ */
+ public void writeToDataOutput(DataOutput out) throws IOException {
+ out.writeDouble(_max);
+ out.writeDouble(_min);
+ out.writeLong(_number);
+ out.writeDouble(_mean);
+ out.writeDouble(_stdDev);
+ out.writeDouble(_sum);
+ out.writeDouble(_sumOfSq);
+ return;
+ }
+
+
+ // ************************************
+ private static final long serialVersionUID = 1L;
+ private double _max = 0 ;
+ private double _min = Double.MAX_VALUE ;
+ private long _number = 0 ;
+ private double _mean = 0 ;
+ private double _stdDev = 0 ;
+ private double _sum = 0 ;
+ private double _sumOfSq ;
+}
+
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/StateMachine.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/StateMachine.java b/commons/src/main/java/com/twitter/common/util/StateMachine.java
new file mode 100644
index 0000000..4a559b5
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/StateMachine.java
@@ -0,0 +1,586 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.base.ExceptionalSupplier;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Represents a state machine that is not necessarily a Finite State Machine.
+ * The caller may configure the state machine to permit only known state transitions, or to only
+ * disallow known state transitions (and permit unknown transitions).
+ *
+ * @param <T> THe type of objects that the caller uses to represent states.
+ *
+ * TODO(William Farner): Consider merging the stats-tracking ala PipelineStats into this.
+ */
+public class StateMachine<T> {
+ private static final Logger LOG = Logger.getLogger(StateMachine.class.getName());
+
+ private final String name;
+
+ // Stores mapping from states to the states that the machine is allowed to transition into.
+ private final Multimap<T, T> stateTransitions;
+
+ private final Closure<Transition<T>> transitionCallback;
+ private final boolean throwOnBadTransition;
+
+ private volatile T currentState;
+ private final Lock readLock;
+ private final Lock writeLock;
+
+
+ private StateMachine(String name,
+ T initialState,
+ Multimap<T, T> stateTransitions,
+ Closure<Transition<T>> transitionCallback,
+ boolean throwOnBadTransition) {
+ this.name = name;
+ this.currentState = initialState;
+ this.stateTransitions = stateTransitions;
+ this.transitionCallback = transitionCallback;
+ this.throwOnBadTransition = throwOnBadTransition;
+
+ ReadWriteLock stateLock = new ReentrantReadWriteLock(true /* fair */);
+ readLock = stateLock.readLock();
+ writeLock = stateLock.writeLock();
+ }
+
+ /**
+ * Gets the name of this state machine.
+ *
+ * @return The state machine name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Fetches the state that the machine is currently in.
+ *
+ * @return Current state.
+ */
+ public T getState() {
+ return currentState;
+ }
+
+ /**
+ * Checks that the current state is the {@code expectedState} and throws if it is not.
+ *
+ * @param expectedState The expected state
+ * @throws IllegalStateException if the current state is not the {@code expectedState}.
+ */
+ public void checkState(T expectedState) {
+ checkState(ImmutableSet.of(expectedState));
+ }
+
+ /**
+ * Checks that the current state is one of the {@code allowedStates} and throws if it is not.
+ *
+ * @param allowedStates The allowed states.
+ * @throws IllegalStateException if the current state is not the {@code expectedState}.
+ */
+ public void checkState(Set<T> allowedStates) {
+ checkNotNull(allowedStates);
+ checkArgument(!allowedStates.isEmpty(), "At least one possible state must be provided.");
+
+ readLock.lock();
+ try {
+ if (!allowedStates.contains(currentState)) {
+ throw new IllegalStateException(
+ String.format("In state %s, expected to be in %s.", currentState, allowedStates));
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Executes the supplied {@code work} if the state machine is in the {@code expectedState},
+ * postponing any concurrently requested {@link #transition(Object)} until after the execution of
+ * the work.
+ *
+ * @param expectedState The expected state the work should be performed in.
+ * @param work The work to perform in the {@code expectedState}.
+ * @param <O> The type returned by the unit of work.
+ * @param <E> The type of exception that may be thrown by the unit of work.
+ * @return The result of the unit of work if the current state is the {@code expectedState}.
+ * @throws IllegalStateException if the current state is not the {@code expectedState}.
+ * @throws E if the unit of work throws.
+ */
+ public <O, E extends Exception> O doInState(T expectedState, ExceptionalSupplier<O, E> work)
+ throws E {
+
+ checkNotNull(expectedState);
+ checkNotNull(work);
+
+ readLock.lock();
+ try {
+ checkState(expectedState);
+ return work.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Transitions the machine into state {@code nextState}.
+ *
+ * @param nextState The state to move into.
+ * @throws IllegalStateTransitionException If the state transition is not allowed.
+ * @return {@code true} if the transition was allowed, {@code false} otherwise.
+ */
+ public boolean transition(T nextState) throws IllegalStateTransitionException {
+ boolean transitionAllowed = false;
+
+ T currentCopy = currentState;
+
+ writeLock.lock();
+ try {
+ if (stateTransitions.containsEntry(currentState, nextState)) {
+ currentState = nextState;
+ transitionAllowed = true;
+ } else if (throwOnBadTransition) {
+ throw new IllegalStateTransitionException(
+ String.format("State transition from %s to %s is not allowed.", currentState,
+ nextState));
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ transitionCallback.execute(new Transition<T>(currentCopy, nextState, transitionAllowed));
+ return transitionAllowed;
+ }
+
+ public static class IllegalStateTransitionException extends IllegalStateException {
+ public IllegalStateTransitionException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Convenience method to create a builder object.
+ *
+ * @param <T> Type of builder to create.
+ * @param name Name of the state machine to create a builder for.
+ * @return New builder.
+ */
+ public static <T> Builder<T> builder(String name) {
+ return new Builder<T>(name);
+ }
+
+ /**
+ * A state and its allowed transitions (if any) and (optional) callback.
+ *
+ * @param <T> State type.
+ */
+ public static class Rule<T> {
+ private final T from;
+ private final Set<T> to;
+ private final Closure<Transition<T>> callback;
+
+ private Rule(T from) {
+ this(from, ImmutableSet.<T>of());
+ }
+
+ private Rule(T from, Set<T> to) {
+ this(from, to, Closures.<Transition<T>>noop());
+ }
+
+ private Rule(T from, Set<T> to, Closure<Transition<T>> callback) {
+ this.from = checkNotNull(from);
+ this.to = checkNotNull(to);
+ this.callback = checkNotNull(callback);
+ }
+
+ /**
+ * Associates a callback to be triggered after any attempt to transition from this state is
+ * made.
+ *
+ * @param callback Callback to signal.
+ * @return A new rule that is identical to this rule, but with the provided
+ * callback
+ */
+ public Rule<T> withCallback(Closure<Transition<T>> callback) {
+ return new Rule<T>(from, to, callback);
+ }
+
+ /**
+ * A helper class when building a transition rule, to define the allowed transitions.
+ *
+ * @param <T> State type.
+ */
+ public static class AllowedTransition<T> {
+ private final Rule<T> rule;
+
+ private AllowedTransition(Rule<T> rule) {
+ this.rule = rule;
+ }
+
+ /**
+ * Associates a single allowed transition with this state.
+ *
+ * @param state Allowed transition state.
+ * @return A new rule that identical to the original, but only allowing a transition to the
+ * provided state.
+ */
+ public Rule<T> to(T state) {
+ return new Rule<T>(rule.from, ImmutableSet.<T>of(state), rule.callback);
+ }
+
+ /**
+ * Associates multiple transitions with this state.
+ *
+ * @param state An allowed transition state.
+ * @param additionalStates Additional states that may be transitioned to.
+ * @return A new rule that identical to the original, but only allowing a transition to the
+ * provided states.
+ */
+ public Rule<T> to(T state, T... additionalStates) {
+ return new Rule<T>(rule.from, ImmutableSet.copyOf(Lists.asList(state, additionalStates)));
+ }
+
+ /**
+ * Allows no transitions to be performed from this state.
+ *
+ * @return The original rule.
+ */
+ public Rule<T> noTransitions() {
+ return rule;
+ }
+ }
+
+ /**
+ * Creates a new transition rule.
+ *
+ * @param state State to create and associate transitions with.
+ * @param <T> State type.
+ * @return A new transition rule builder.
+ */
+ public static <T> AllowedTransition<T> from(T state) {
+ return new AllowedTransition<T>(new Rule<T>(state));
+ }
+ }
+
+ /**
+ * Builder to create a state machine.
+ *
+ * @param <T>
+ */
+ public static class Builder<T> {
+ private final String name;
+ private T initialState;
+ private final Multimap<T, T> stateTransitions = HashMultimap.create();
+ private final List<Closure<Transition<T>>> transitionCallbacks = Lists.newArrayList();
+ private boolean throwOnBadTransition = true;
+
+ public Builder(String name) {
+ this.name = checkNotBlank(name);
+ }
+
+ /**
+ * Sets the initial state for the state machine.
+ *
+ * @param state Initial state.
+ * @return A reference to the builder.
+ */
+ public Builder<T> initialState(T state) {
+ checkNotNull(state);
+ initialState = state;
+ return this;
+ }
+
+ /**
+ * Adds a state and its allowed transitions.
+ *
+ * @param rule The state and transition rule to add.
+ * @return A reference to the builder.
+ */
+ public Builder<T> addState(Rule<T> rule) {
+ return addState(rule.callback, rule.from, rule.to);
+ }
+
+ /**
+ * Adds a state and its allowed transitions.
+ * At least one transition state must be added, it is not necessary to explicitly add states
+ * that have no allowed transitions (terminal states).
+ *
+ * @param callback Callback to notify of any transition attempted from the state.
+ * @param state State to add.
+ * @param transitionStates Allowed transitions from {@code state}.
+ * @return A reference to the builder.
+ */
+ public Builder<T> addState(Closure<Transition<T>> callback, T state,
+ Set<T> transitionStates) {
+ checkNotNull(callback);
+ checkNotNull(state);
+
+ Preconditions.checkArgument(Iterables.all(transitionStates, Predicates.notNull()));
+
+ stateTransitions.putAll(state, transitionStates);
+
+ @SuppressWarnings("unchecked")
+ Predicate<Transition<T>> filter = Transition.from(state);
+ onTransition(filter, callback);
+ return this;
+ }
+
+ /**
+ * Varargs version of {@link #addState(com.twitter.common.base.Closure, Object, java.util.Set)}.
+ *
+ * @param callback Callback to notify of any transition attempted from the state.
+ * @param state State to add.
+ * @param transitionStates Allowed transitions from {@code state}.
+ * @return A reference to the builder.
+ */
+ public Builder<T> addState(Closure<Transition<T>> callback, T state,
+ T... transitionStates) {
+ Set<T> states = ImmutableSet.copyOf(transitionStates);
+ Preconditions.checkArgument(Iterables.all(states, Predicates.notNull()));
+
+ return addState(callback, state, states);
+ }
+
+ /**
+ * Adds a state and its allowed transitions.
+ * At least one transition state must be added, it is not necessary to explicitly add states
+ * that have no allowed transitions (terminal states).
+ *
+ * @param state State to add.
+ * @param transitionStates Allowed transitions from {@code state}.
+ * @return A reference to the builder.
+ */
+ public Builder<T> addState(T state, T... transitionStates) {
+ return addState(Closures.<Transition<T>>noop(), state, transitionStates);
+ }
+
+ private void onTransition(Predicate<Transition<T>> transitionFilter,
+ Closure<Transition<T>> handler) {
+ onAnyTransition(Closures.filter(transitionFilter, handler));
+ }
+
+ /**
+ * Adds a callback to be executed for every state transition, including invalid transitions
+ * that are attempted.
+ *
+ * @param handler Callback to notify of transition attempts.
+ * @return A reference to the builder.
+ */
+ public Builder<T> onAnyTransition(Closure<Transition<T>> handler) {
+ transitionCallbacks.add(handler);
+ return this;
+ }
+
+ /**
+ * Adds a log message for every state transition that is attempted.
+ *
+ * @return A reference to the builder.
+ */
+ public Builder<T> logTransitions() {
+ return onAnyTransition(new Closure<Transition<T>>() {
+ @Override public void execute(Transition<T> transition) {
+ LOG.info(name + " state machine transition " + transition);
+ }
+ });
+ }
+
+ /**
+ * Allows the caller to specify whether {@link IllegalStateTransitionException} should be thrown
+ * when a bad state transition is attempted (the default behavior).
+ *
+ * @param throwOnBadTransition Whether an exception should be thrown when a bad state transition
+ * is attempted.
+ * @return A reference to the builder.
+ */
+ public Builder<T> throwOnBadTransition(boolean throwOnBadTransition) {
+ this.throwOnBadTransition = throwOnBadTransition;
+ return this;
+ }
+
+ /**
+ * Builds the state machine.
+ *
+ * @return A reference to the prepared state machine.
+ */
+ public StateMachine<T> build() {
+ Preconditions.checkState(initialState != null, "Initial state must be specified.");
+ checkArgument(!stateTransitions.isEmpty(), "No state transitions were specified.");
+ return new StateMachine<T>(name,
+ initialState,
+ stateTransitions,
+ Closures.combine(transitionCallbacks),
+ throwOnBadTransition);
+ }
+ }
+
+ /**
+ * Representation of a state transition.
+ *
+ * @param <T> State type.
+ */
+ public static class Transition<T> {
+ private final T from;
+ private final T to;
+ private final boolean allowed;
+
+ public Transition(T from, T to, boolean allowed) {
+ this.from = checkNotNull(from);
+ this.to = checkNotNull(to);
+ this.allowed = allowed;
+ }
+
+ private static <T> Function<Transition<T>, T> from() {
+ return new Function<Transition<T>, T>() {
+ @Override public T apply(Transition<T> transition) {
+ return transition.from;
+ }
+ };
+ }
+
+ private static <T> Function<Transition<T>, T> to() {
+ return new Function<Transition<T>, T>() {
+ @Override public T apply(Transition<T> transition) {
+ return transition.to;
+ }
+ };
+ }
+
+ private static <T> Predicate<Transition<T>> oneSideFilter(
+ Function<Transition<T>, T> extractor, final T... states) {
+ checkArgument(Iterables.all(Arrays.asList(states), Predicates.notNull()));
+
+ return Predicates.compose(Predicates.in(ImmutableSet.copyOf(states)), extractor);
+ }
+
+ /**
+ * Creates a predicate that returns {@code true} for transitions from the given states.
+ *
+ * @param states States to filter on.
+ * @param <T> State type.
+ * @return A from-state filter.
+ */
+ public static <T> Predicate<Transition<T>> from(final T... states) {
+ return oneSideFilter(Transition.<T>from(), states);
+ }
+
+ /**
+ * Creates a predicate that returns {@code true} for transitions to the given states.
+ *
+ * @param states States to filter on.
+ * @param <T> State type.
+ * @return A to-state filter.
+ */
+ public static <T> Predicate<Transition<T>> to(final T... states) {
+ return oneSideFilter(Transition.<T>to(), states);
+ }
+
+ /**
+ * Creates a predicate that returns {@code true} for a specific state transition.
+ *
+ * @param from From state.
+ * @param to To state.
+ * @param <T> State type.
+ * @return A state transition filter.
+ */
+ public static <T> Predicate<Transition<T>> transition(final T from, final T to) {
+ @SuppressWarnings("unchecked")
+ Predicate<Transition<T>> fromFilter = from(from);
+ @SuppressWarnings("unchecked")
+ Predicate<Transition<T>> toFilter = to(to);
+ return Predicates.and(fromFilter, toFilter);
+ }
+
+ public T getFrom() {
+ return from;
+ }
+
+ public T getTo() {
+ return to;
+ }
+
+ public boolean isAllowed() {
+ return allowed;
+ }
+
+ /**
+ * Checks whether this transition represents a state change, which means that the 'to' state is
+ * not equal to the 'from' state, and the transition is allowed.
+ *
+ * @return {@code true} if the state was changed, {@code false} otherwise.
+ */
+ public boolean isValidStateChange() {
+ return isAllowed() && !from.equals(to);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Transition)) {
+ return false;
+ }
+
+ if (o == this) {
+ return true;
+ }
+
+ Transition<?> other = (Transition) o;
+ return from.equals(other.from) && to.equals(other.to);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(from)
+ .append(to)
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ String str = from.toString() + " -> " + to.toString();
+ if (!isAllowed()) {
+ str += " (not allowed)";
+ }
+ return str;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/Timer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/Timer.java b/commons/src/main/java/com/twitter/common/util/Timer.java
new file mode 100644
index 0000000..d944f03
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/Timer.java
@@ -0,0 +1,74 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util;
+
+import com.twitter.common.base.Commands;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.base.ExceptionalSupplier;
+import com.twitter.common.stats.SlidingStats;
+
+/**
+ * A utility for timing blocks of code.
+ *
+ * <p>TODO(John Sirois): consider instead:
+ * <T, E extends Exception> Pair<T, Long> doTimed(ExceptionalSupplier<T, E> timedWork) throws E
+ * or a subinterface of Command/Closure/Supplier/Function that exposes a timing method as other ways
+ * to factor in timing.
+ *
+ * @author John Sirois
+ */
+public final class Timer {
+
+ /**
+ * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}.
+ *
+ * @param stat the stat to record the timing with
+ * @param timedWork the code to time
+ * @param <E> the type of exception {@code timedWork} may throw
+ * @throws E if {@code timedWork} throws
+ */
+ public static <E extends Exception> void doTimed(SlidingStats stat,
+ final ExceptionalCommand<E> timedWork) throws E {
+ doTimed(stat, Commands.asSupplier(timedWork));
+ }
+
+ /**
+ * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}.
+ *
+ * @param stat the stat to record the timing with
+ * @param timedWork the code to time
+ * @param <T> the type of result {@code timedWork} returns
+ * @param <E> the type of exception {@code timedWork} may throw
+ * @return the result of {@code timedWork} if it completes normally
+ * @throws E if {@code timedWork} throws
+ */
+ public static <T, E extends Exception> T doTimed(SlidingStats stat,
+ ExceptionalSupplier<T, E> timedWork) throws E {
+ StartWatch timer = new StartWatch();
+ timer.start();
+ try {
+ return timedWork.get();
+ } finally {
+ timer.stop();
+ stat.accumulate(timer.getTime());
+ }
+ }
+
+ private Timer() {
+ // utility
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java b/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java
new file mode 100644
index 0000000..779e0be
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/TruncatedBinaryBackoff.java
@@ -0,0 +1,77 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * A BackoffStrategy that implements truncated binary exponential backoff.
+ */
+public class TruncatedBinaryBackoff implements BackoffStrategy {
+ private final long initialBackoffMs;
+ private final long maxBackoffIntervalMs;
+ private final boolean stopAtMax;
+
+ /**
+ * Creates a new TruncatedBinaryBackoff that will start by backing off for {@code initialBackoff}
+ * and then backoff of twice as long each time its called until reaching the {@code maxBackoff} at
+ * which point shouldContinue() will return false and any future backoffs will always wait for
+ * that amount of time.
+ *
+ * @param initialBackoff the intial amount of time to backoff
+ * @param maxBackoff the maximum amount of time to backoff
+ * @param stopAtMax whether shouldContinue() returns false when the max is reached
+ */
+ public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff,
+ Amount<Long, Time> maxBackoff, boolean stopAtMax) {
+ Preconditions.checkNotNull(initialBackoff);
+ Preconditions.checkNotNull(maxBackoff);
+ Preconditions.checkArgument(initialBackoff.getValue() > 0);
+ Preconditions.checkArgument(maxBackoff.compareTo(initialBackoff) >= 0);
+ initialBackoffMs = initialBackoff.as(Time.MILLISECONDS);
+ maxBackoffIntervalMs = maxBackoff.as(Time.MILLISECONDS);
+ this.stopAtMax = stopAtMax;
+ }
+
+ /**
+ * Same as main constructor, but this will always return true from shouldContinue().
+ *
+ * @param initialBackoff the intial amount of time to backoff
+ * @param maxBackoff the maximum amount of time to backoff
+ */
+ public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) {
+ this(initialBackoff, maxBackoff, false);
+ }
+
+ @Override
+ public long calculateBackoffMs(long lastBackoffMs) {
+ Preconditions.checkArgument(lastBackoffMs >= 0);
+ long backoff = (lastBackoffMs == 0) ? initialBackoffMs
+ : Math.min(maxBackoffIntervalMs, lastBackoffMs * 2);
+ return backoff;
+ }
+
+ @Override
+ public boolean shouldContinue(long lastBackoffMs) {
+ Preconditions.checkArgument(lastBackoffMs >= 0);
+ boolean stop = stopAtMax && (lastBackoffMs >= maxBackoffIntervalMs);
+
+ return !stop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/Cache.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/caching/Cache.java b/commons/src/main/java/com/twitter/common/util/caching/Cache.java
new file mode 100644
index 0000000..d37e601
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/caching/Cache.java
@@ -0,0 +1,49 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.caching;
+
+/**
+ * Definition of basic caching functionality. Cache keys and values are expected to always be
+ * valid, non-null values.
+ *
+ * @author William Farner
+ */
+public interface Cache<K, V> {
+
+ /**
+ * Fetches a value from the cache.
+ *
+ * @param key The key for the value to fetch, must not be {@code null}.
+ * @return The cached value corresponding with {@code key}, or {@code null} if no entry exists.
+ */
+ public V get(K key);
+
+ /**
+ * Stores a key-value pair in the cache.
+ *
+ * @param key The key to store, must not be {@code null}.
+ * @param value The value to store, must not be {@code null}.
+ */
+ public void put(K key, V value);
+
+ /**
+ * Deletes an entry from the cache.
+ *
+ * @param key Key for the value to delete, must not be {@code null}.
+ */
+ public void delete(K key);
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java b/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java
new file mode 100644
index 0000000..2a07a6e
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/caching/CachingMethodProxy.java
@@ -0,0 +1,265 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.caching;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A proxy class that handles caching of return values for method calls to a wrapped object.
+ *
+ * Example usage:
+ *
+ * Foo uncached = new Foo();
+ * CachingMethodProxy<Foo> methodProxy = CachingMethodProxy.proxyFor(uncached, Foo.class);
+ * Foo foo = methodProxy.getCachingProxy();
+ * methodProxy.cache(foo.doBar(), lruCache1)
+ * .cache(foo.doBaz(), lruCache2)
+ * .prepare();
+ *
+ * @author William Farner
+ */
+public class CachingMethodProxy<T> {
+
+ // Dummy return values to return when in recording state.
+ private static final Map<Class<?>, Object> EMPTY_RETURN_VALUES =
+ ImmutableMap.<Class<?>, Object>builder()
+ .put(Boolean.TYPE, Boolean.FALSE)
+ .put(Byte.TYPE, Byte.valueOf((byte) 0))
+ .put(Short.TYPE, Short.valueOf((short) 0))
+ .put(Character.TYPE, Character.valueOf((char)0))
+ .put(Integer.TYPE, Integer.valueOf(0))
+ .put(Long.TYPE, Long.valueOf(0))
+ .put(Float.TYPE, Float.valueOf(0))
+ .put(Double.TYPE, Double.valueOf(0))
+ .build();
+ private static final Map<Class<?>, Class<?>> AUTO_BOXING_MAP =
+ ImmutableMap.<Class<?>, Class<?>>builder()
+ .put(Boolean.TYPE, Boolean.class)
+ .put(Byte.TYPE, Byte.class)
+ .put(Short.TYPE, Short.class)
+ .put(Character.TYPE, Character.class)
+ .put(Integer.TYPE, Integer.class)
+ .put(Long.TYPE, Long.class)
+ .put(Float.TYPE, Float.class)
+ .put(Double.TYPE, Double.class)
+ .build();
+
+ // The uncached resource, whose method calls are deemed to be expensive and cacheable.
+ private final T uncached;
+
+ // The methods that are cached, and the caches themselves.
+ private final Map<Method, MethodCache> methodCaches = Maps.newHashMap();
+ private final Class<T> type;
+
+ private Method lastMethodCall = null;
+ private boolean recordMode = true;
+
+ /**
+ * Creates a new caching method proxy that will wrap an object and cache for the provided methods.
+ *
+ * @param uncached The uncached object that will be reverted to when a cache entry is not present.
+ */
+ private CachingMethodProxy(T uncached, Class<T> type) {
+ this.uncached = Preconditions.checkNotNull(uncached);
+ this.type = Preconditions.checkNotNull(type);
+ Preconditions.checkArgument(type.isInterface(), "The proxied type must be an interface.");
+ }
+
+ private static Object invokeMethod(Object subject, Method method, Object[] args)
+ throws Throwable {
+ try {
+ return method.invoke(subject, args);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Cannot access " + subject.getClass() + "." + method, e);
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+
+ /**
+ * A cached method and its caching control structures.
+ *
+ * @param <K> Cache key type.
+ * @param <V> Cache value type, expected to match the return type of the method.
+ */
+ private static class MethodCache<K, V> {
+ private final Method method;
+ private final Cache<K, V> cache;
+ private final Function<Object[], K> keyBuilder;
+ private final Predicate<V> entryFilter;
+
+ MethodCache(Method method, Cache<K, V> cache, Function<Object[], K> keyBuilder,
+ Predicate<V> entryFilter) {
+ this.method = method;
+ this.cache = cache;
+ this.keyBuilder = keyBuilder;
+ this.entryFilter = entryFilter;
+ }
+
+ V doInvoke(Object uncached, Object[] args) throws Throwable {
+ K key = keyBuilder.apply(args);
+
+ V cachedValue = cache.get(key);
+
+ if (cachedValue != null) return cachedValue;
+
+ Object fetched = invokeMethod(uncached, method, args);
+
+ if (fetched == null) return null;
+
+ @SuppressWarnings("unchecked")
+ V typedValue = (V) fetched;
+
+ if (entryFilter.apply(typedValue)) cache.put(key, typedValue);
+
+ return typedValue;
+ }
+ }
+
+ /**
+ * Creates a new builder for the given type.
+ *
+ * @param uncached The uncached object that should be insulated by caching.
+ * @param type The interface that a proxy should be created for.
+ * @param <T> Type parameter to the proxied class.
+ * @return A new builder.
+ */
+ public static <T> CachingMethodProxy<T> proxyFor(T uncached, Class<T> type) {
+ return new CachingMethodProxy<T>(uncached, type);
+ }
+
+ @SuppressWarnings("unchecked")
+ public T getCachingProxy() {
+ return (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[] { type },
+ new InvocationHandler() {
+ @Override public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ return doInvoke(method, args);
+ }
+ });
+ }
+
+ private Object doInvoke(Method method, Object[] args) throws Throwable {
+ return recordMode ? recordCall(method) : cacheRequest(method, args);
+ }
+
+ private Object recordCall(Method method) {
+ Preconditions.checkArgument(method.getReturnType() != Void.TYPE,
+ "Void return methods cannot be cached: " + method);
+ Preconditions.checkArgument(method.getParameterTypes().length > 0,
+ "Methods with zero arguments cannot be cached: " + method);
+ Preconditions.checkState(lastMethodCall == null,
+ "No cache instructions provided for call to: " + lastMethodCall);
+
+ lastMethodCall = method;
+
+ Class<?> returnType = method.getReturnType();
+ return returnType.isPrimitive() ? EMPTY_RETURN_VALUES.get(returnType) : null;
+ }
+
+ private Object cacheRequest(Method method, Object[] args) throws Throwable {
+ MethodCache cache = methodCaches.get(method);
+
+ // Check if we are caching for this method.
+ if (cache == null) return invokeMethod(uncached, method, args);
+
+ return cache.doInvoke(uncached, args);
+ }
+
+ /**
+ * Instructs the proxy that cache setup is complete, and the proxy instance should begin caching
+ * and delegating uncached calls. After this is called, any subsequent calls to any of the
+ * cache setup methods will result in an {@link IllegalStateException}.
+ */
+ public void prepare() {
+ Preconditions.checkState(!methodCaches.isEmpty(), "At least one method must be cached.");
+ Preconditions.checkState(recordMode, "prepare() may only be invoked once.");
+
+ recordMode = false;
+ }
+
+ public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache) {
+ return cache(value, cache, Predicates.<V>alwaysTrue());
+ }
+
+ public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache,
+ Predicate<V> valueFilter) {
+ return cache(value, cache, DEFAULT_KEY_BUILDER, valueFilter);
+ }
+
+ public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache,
+ Function<Object[], K> keyBuilder) {
+ // Get the last method call and declare it the cached method.
+ return cache(value, cache, keyBuilder, Predicates.<V>alwaysTrue());
+ }
+
+ public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache,
+ Function<Object[], K> keyBuilder, Predicate<V> valueFilter) {
+ Preconditions.checkNotNull(cache);
+ Preconditions.checkNotNull(keyBuilder);
+ Preconditions.checkNotNull(valueFilter);
+
+ Preconditions.checkState(recordMode, "Cache setup is not allowed after prepare() is called.");
+
+ // Get the last method call and declare it the cached method.
+ Preconditions.checkState(lastMethodCall != null, "No method call captured to be cached.");
+
+ Class<?> returnType = lastMethodCall.getReturnType();
+
+ Preconditions.checkArgument(returnType != Void.TYPE,
+ "Cannot cache results from void method: " + lastMethodCall);
+
+ if (returnType.isPrimitive()) {
+ // If a primitive type is returned, we need to make sure that the cache holds the boxed
+ // type for the primitive.
+ returnType = AUTO_BOXING_MAP.get(returnType);
+ }
+
+ // TODO(William Farner): Figure out a simple way to make this possible. Right now, since the proxy
+ // objects return null, we get a null here and can't check the type.
+ //Preconditions.checkArgument(value.getClass() == returnType,
+ // String.format("Cache value type '%s' does not match method return type '%s'",
+ // value.getClass(), lastMethodCall.getReturnType()));
+
+ methodCaches.put(lastMethodCall, new MethodCache<K, V>(lastMethodCall, cache, keyBuilder,
+ valueFilter));
+
+ lastMethodCall = null;
+
+ return this;
+ }
+
+ private static final Function<Object[], List> DEFAULT_KEY_BUILDER =
+ new Function<Object[], List>() {
+ @Override public List apply(Object[] args) {
+ return Arrays.asList(args);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java b/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java
new file mode 100644
index 0000000..f84bec3
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/caching/LRUCache.java
@@ -0,0 +1,173 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.caching;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.stats.Stats;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A cache with a fixed maximum size, evicting items that were used least-recently.
+ * WARNING: This is not thread-safe. If you wish to get a thread-safe version of a constructed
+ * LRUCache, you must wrap it with {@link Collections#synchronizedMap(java.util.Map)}.
+ *
+ * @author William Farner
+ */
+public class LRUCache<K, V> implements Cache<K, V> {
+
+ private Map<K, V> map;
+
+ private final AtomicLong accesses;
+ private final AtomicLong misses;
+
+ /**
+ * Creates a new bounded cache with the given load factor.
+ *
+ * @param name Unique name for this cache.
+ * @param maxCapacity Maximum capacity for the cache, after which items will be evicted.
+ * @param loadFactor Load factor for the cache.
+ * @param makeSynchronized Whether the underlying map should be synchronized.
+ * @param evictionListener Listener to be notified when an element is evicted, or {@code null} if
+ * eviction notifications are not needed.
+ */
+ private LRUCache(final String name, final int maxCapacity, float loadFactor,
+ boolean makeSynchronized, final Closure<Pair<K, V>> evictionListener) {
+ map = new LinkedHashMap<K, V>(maxCapacity, loadFactor, true /* Access order. */) {
+ @Override public boolean removeEldestEntry(Map.Entry<K, V> entry) {
+ boolean evict = size() > maxCapacity;
+ if (evict && evictionListener != null) {
+ evictionListener.execute(Pair.of(entry.getKey(), entry.getValue()));
+ }
+ return evict;
+ }
+ };
+
+ if (makeSynchronized) {
+ map = Collections.synchronizedMap(map);
+ }
+
+ accesses = Stats.exportLong(name + "_lru_cache_accesses");
+ misses = Stats.exportLong(name + "_lru_cache_misses");
+ }
+
+ public static <K, V> Builder<K, V> builder() {
+ return new Builder<K, V>();
+ }
+
+ public static class Builder<K, V> {
+ private String name = null;
+
+ private int maxSize = 1000;
+
+ // Sadly, LinkedHashMap doesn't expose this, so the default is pulled from the javadoc.
+ private float loadFactor = 0.75F;
+
+ private boolean makeSynchronized = true;
+
+ private Closure<Pair<K, V>> evictionListener = null;
+
+ public Builder<K, V> name(String name) {
+ this.name = MorePreconditions.checkNotBlank(name);
+ return this;
+ }
+
+ public Builder<K, V> maxSize(int maxSize) {
+ Preconditions.checkArgument(maxSize > 0);
+ this.maxSize = maxSize;
+ return this;
+ }
+
+ public Builder<K, V> loadFactor(float loadFactor) {
+ this.loadFactor = loadFactor;
+ return this;
+ }
+
+ public Builder<K, V> makeSynchronized(boolean makeSynchronized) {
+ this.makeSynchronized = makeSynchronized;
+ return this;
+ }
+
+ public Builder<K, V> evictionListener(Closure<Pair<K, V>> evictionListener) {
+ this.evictionListener = evictionListener;
+ return this;
+ }
+
+ public LRUCache<K, V> build() {
+ return new LRUCache<K, V>(name, maxSize, loadFactor, makeSynchronized, evictionListener);
+ }
+ }
+
+ @Override
+ public V get(K key) {
+ accesses.incrementAndGet();
+ V value = map.get(key);
+ if (value == null) {
+ misses.incrementAndGet();
+ }
+ return value;
+ }
+
+ @Override
+ public void put(K key, V value) {
+ map.put(key, value);
+ }
+
+ @Override
+ public void delete(K key) {
+ map.remove(key);
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("size: %d, accesses: %s, misses: %s",
+ map.size(),
+ accesses,
+ misses);
+ }
+
+ public Collection<V> copyValues() {
+ synchronized(map) {
+ return ImmutableList.copyOf(map.values());
+ }
+ }
+
+ public long getAccesses() {
+ return accesses.longValue();
+ }
+
+ public long getMisses() {
+ return misses.longValue();
+ }
+
+ public double getHitRate() {
+ double numAccesses = accesses.longValue();
+ return numAccesses == 0 ? 0 : (numAccesses - misses.longValue()) / numAccesses;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java
new file mode 100644
index 0000000..9793276
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/BackingOffFutureTask.java
@@ -0,0 +1,59 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.concurrent;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.util.BackoffStrategy;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link RetryingFutureTask} that will resubmit itself to a work queue with a backoff.
+ *
+ * @author William Farner
+ */
+public class BackingOffFutureTask extends RetryingFutureTask {
+ private final ScheduledExecutorService executor;
+ private final BackoffStrategy backoffStrategy;
+ private long backoffMs = 0;
+
+ /**
+ * Creates a new retrying future task that will execute a unit of work until successfully
+ * completed, or the retry limit has been reached.
+ *
+ * @param executor The executor service to resubmit the task to upon failure.
+ * @param callable The unit of work. The work is considered successful when {@code true} is
+ * returned. It may return {@code false} or throw an exception when
+ * unsueccessful.
+ * @param maxRetries The maximum number of times to retry the task.
+ * @param backoffStrategy Strategy to use for determining backoff duration.
+ */
+ public BackingOffFutureTask(ScheduledExecutorService executor, Callable<Boolean> callable,
+ int maxRetries, BackoffStrategy backoffStrategy) {
+ super(executor, callable, maxRetries);
+ this.executor = executor;
+ this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy);
+ }
+
+ @Override
+ protected void retry() {
+ backoffMs = backoffStrategy.calculateBackoffMs(backoffMs);
+ executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java
new file mode 100644
index 0000000..056282c
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingExecutorService.java
@@ -0,0 +1,81 @@
+package com.twitter.common.util.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
+
+/**
+ * An executor service that delegates to another executor service, invoking an uncaught
+ * exception handler if any exceptions are thrown in submitted work.
+ *
+ * @see MoreExecutors
+ */
+class ExceptionHandlingExecutorService extends ForwardingExecutorService<ExecutorService> {
+ private final Supplier<Thread.UncaughtExceptionHandler> handler;
+
+ ExceptionHandlingExecutorService(
+ ExecutorService delegate,
+ Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+ super(delegate);
+ this.handler = Preconditions.checkNotNull(handler);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return super.submit(TaskConverter.alertingCallable(task, handler));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return super.submit(TaskConverter.alertingRunnable(task, handler), result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return super.submit(TaskConverter.alertingRunnable(task, handler));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks) throws InterruptedException {
+
+ return super.invokeAll(TaskConverter.alertingCallables(tasks, handler));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit) throws InterruptedException {
+
+ return super.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(
+ Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+
+ return super.invokeAny(TaskConverter.alertingCallables(tasks, handler));
+ }
+
+ @Override
+ public <T> T invokeAny(
+ Collection<? extends Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+ return super.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java
new file mode 100644
index 0000000..2cd807d
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java
@@ -0,0 +1,108 @@
+package com.twitter.common.util.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
+
+/**
+ * A scheduled executor service that delegates to another executor service, invoking an uncaught
+ * exception handler if any exceptions are thrown in submitted work.
+ *
+ * @see MoreExecutors
+ */
+class ExceptionHandlingScheduledExecutorService
+ extends ForwardingExecutorService<ScheduledExecutorService>
+ implements ScheduledExecutorService {
+ private final Supplier<Thread.UncaughtExceptionHandler> handler;
+
+ /**
+ * Construct a {@link ScheduledExecutorService} with a supplier of
+ * {@link Thread.UncaughtExceptionHandler} that handles exceptions thrown from submitted work.
+ */
+ ExceptionHandlingScheduledExecutorService(
+ ScheduledExecutorService delegate,
+ Supplier<Thread.UncaughtExceptionHandler> handler) {
+ super(delegate);
+ this.handler = handler;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
+ return delegate.schedule(TaskConverter.alertingRunnable(runnable, handler), delay, timeUnit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit timeUnit) {
+ return delegate.schedule(TaskConverter.alertingCallable(callable, handler), delay, timeUnit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) {
+ return delegate.scheduleAtFixedRate(
+ TaskConverter.alertingRunnable(runnable, handler), initialDelay, period, timeUnit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
+ return delegate.scheduleWithFixedDelay(
+ TaskConverter.alertingRunnable(runnable, handler), initialDelay, delay, timeUnit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return delegate.submit(TaskConverter.alertingCallable(task, handler));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return delegate.submit(TaskConverter.alertingRunnable(task, handler), result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return delegate.submit(TaskConverter.alertingRunnable(task, handler));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler));
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(TaskConverter.alertingRunnable(command, handler));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
new file mode 100644
index 0000000..0e85bad
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/ExecutorServiceShutdown.java
@@ -0,0 +1,74 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}.
+ *
+ * @author John Sirois
+ */
+public class ExecutorServiceShutdown implements Command {
+ private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName());
+
+ private final ExecutorService executor;
+ private final Amount<Long, Time> gracePeriod;
+
+ /**
+ * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the
+ * given {@code executor} when executed. If the supplied grace period is less than or equal to
+ * zero the executor service will be asked to shut down but no waiting will be done after these
+ * requests.
+ *
+ * @param executor The executor service this command should shut down when executed.
+ * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the
+ * next shutdown phase.
+ */
+ public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) {
+ this.executor = Preconditions.checkNotNull(executor);
+ this.gracePeriod = Preconditions.checkNotNull(gracePeriod);
+ }
+
+ @Override
+ public void execute() {
+ executor.shutdown(); // Disable new tasks from being submitted.
+ try {
+ // Wait a while for existing tasks to terminate.
+ if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow(); // Cancel currently executing tasks.
+ // Wait a while for tasks to respond to being cancelled.
+ if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+ LOG.warning("Pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted.
+ executor.shutdownNow();
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
new file mode 100644
index 0000000..8b73ab0
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/ForwardingExecutorService.java
@@ -0,0 +1,104 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.concurrent;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An executor service that forwards all calls to another executor service. Subclasses should
+ * override one or more methods to modify the behavior of the backing executor service as desired
+ * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ *
+ * @author John Sirois
+ */
+public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService {
+ protected final T delegate;
+
+ public ForwardingExecutorService(T delegate) {
+ Preconditions.checkNotNull(delegate);
+ this.delegate = delegate;
+ }
+
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ public <T> Future<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ public Future<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+
+ return delegate.invokeAll(tasks);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit) throws InterruptedException {
+
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+
+ return delegate.invokeAny(tasks);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ public void execute(Runnable command) {
+ delegate.execute(command);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
new file mode 100644
index 0000000..2591938
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/MoreExecutors.java
@@ -0,0 +1,109 @@
+package com.twitter.common.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * Utility class that provides factory functions to decorate
+ * {@link java.util.concurrent.ExecutorService}s.
+ */
+public final class MoreExecutors {
+ private MoreExecutors() {
+ // utility
+ }
+
+ /**
+ * Returns a {@link ExecutorService} that passes uncaught exceptions to
+ * {@link java.lang.Thread.UncaughtExceptionHandler}.
+ * <p>
+ * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
+ * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
+ * unchecked exceptions thrown from submitted work. Some users are surprised to find that
+ * even the default uncaught exception handler is not invoked.
+ *
+ * @param executorService delegate
+ * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
+ * from executor tasks.
+ * @return a decorated executor service
+ */
+ public static ExecutorService exceptionHandlingExecutor(
+ ExecutorService executorService,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+
+ Preconditions.checkNotNull(uncaughtExceptionHandler);
+ return new ExceptionHandlingExecutorService(
+ executorService, Suppliers.ofInstance(uncaughtExceptionHandler));
+ }
+
+ /**
+ * Returns a {@link ExecutorService} that passes uncaught exceptions to
+ * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
+ * at the time the exception is thrown.
+ *
+ * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService,
+ * Thread.UncaughtExceptionHandler)
+ * @param executorService delegate
+ * @return a decorated executor service
+ */
+ public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) {
+ return new ExceptionHandlingExecutorService(
+ executorService,
+ new Supplier<Thread.UncaughtExceptionHandler>() {
+ @Override
+ public Thread.UncaughtExceptionHandler get() {
+ return Thread.currentThread().getUncaughtExceptionHandler();
+ }
+ });
+ }
+
+ /**
+ * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
+ * {@link java.lang.Thread.UncaughtExceptionHandler}.
+ * <p>
+ * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
+ * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
+ * unchecked exceptions thrown from submitted work. Some users are surprised to find that
+ * even the default uncaught exception handler is not invoked.
+ *
+ * @param executorService delegate
+ * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
+ * from executor tasks.
+ * @return a decorated executor service
+ */
+ public static ScheduledExecutorService exceptionHandlingExecutor(
+ ScheduledExecutorService executorService,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+
+ Preconditions.checkNotNull(uncaughtExceptionHandler);
+ return new ExceptionHandlingScheduledExecutorService(
+ executorService,
+ Suppliers.ofInstance(uncaughtExceptionHandler));
+ }
+
+ /**
+ * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
+ * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
+ * at the time the exception is thrown.
+ *
+ * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService,
+ * Thread.UncaughtExceptionHandler)
+ * @param executorService delegate
+ * @return a decorated executor service
+ */
+ public static ScheduledExecutorService exceptionHandlingExecutor(
+ ScheduledExecutorService executorService) {
+
+ return new ExceptionHandlingScheduledExecutorService(
+ executorService,
+ new Supplier<Thread.UncaughtExceptionHandler>() {
+ @Override
+ public Thread.UncaughtExceptionHandler get() {
+ return Thread.currentThread().getUncaughtExceptionHandler();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
new file mode 100644
index 0000000..808f3a9
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/RetryingFutureTask.java
@@ -0,0 +1,84 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.util.concurrent;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A future task that supports retries by resubmitting itself to an {@link ExecutorService}.
+ *
+ * @author William Farner
+ */
+public class RetryingFutureTask extends FutureTask<Boolean> {
+ private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName());
+
+ protected final ExecutorService executor;
+ protected final int maxRetries;
+ protected int numRetries = 0;
+ protected final Callable<Boolean> callable;
+
+ /**
+ * Creates a new retrying future task that will execute a unit of work until successfully
+ * completed, or the retry limit has been reached.
+ *
+ * @param executor The executor service to resubmit the task to upon failure.
+ * @param callable The unit of work. The work is considered successful when {@code true} is
+ * returned. It may return {@code false} or throw an exception when unsueccessful.
+ * @param maxRetries The maximum number of times to retry the task.
+ */
+ public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) {
+ super(callable);
+ this.callable = Preconditions.checkNotNull(callable);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.maxRetries = maxRetries;
+ }
+
+ /**
+ * Invokes a retry of this task.
+ */
+ protected void retry() {
+ executor.execute(this);
+ }
+
+ @Override
+ public void run() {
+ boolean success = false;
+ try {
+ success = callable.call();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Exception while executing task.", e);
+ }
+
+ if (!success) {
+ numRetries++;
+ if (numRetries > maxRetries) {
+ LOG.severe("Task did not complete after " + maxRetries + " retries, giving up.");
+ } else {
+ LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")");
+ retry();
+ }
+ } else {
+ set(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
new file mode 100644
index 0000000..9f1fcc2
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/util/concurrent/TaskConverter.java
@@ -0,0 +1,80 @@
+package com.twitter.common.util.concurrent;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
+
+final class TaskConverter {
+ private TaskConverter() {
+ // utility
+ }
+
+ /**
+ * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the
+ * original Runnable to {@link Thread.UncaughtExceptionHandler}.
+ *
+ * @param runnable runnable to be wrapped
+ * @param handler exception handler that will receive exceptions generated in the runnable
+ * @return wrapped runnable
+ */
+ static Runnable alertingRunnable(
+ final Runnable runnable,
+ final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ handler.get().uncaughtException(Thread.currentThread(), t);
+ throw Throwables.propagate(t);
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions
+ * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}.
+ *
+ * @param callable callable to be wrapped
+ * @param handler exception handler that will receive exceptions generated in the callable
+ * @return wrapped callable
+ */
+ static <V> Callable<V> alertingCallable(
+ final Callable<V> callable,
+ final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+ return new Callable<V>() {
+ @Override
+ public V call() throws Exception {
+ try {
+ return callable.call();
+ } catch (Throwable t) {
+ handler.get().uncaughtException(Thread.currentThread(), t);
+ throw Throwables.propagate(t);
+ }
+ }
+ };
+ }
+
+ /*
+ * Calls #alertingCallable on a collection of callables
+ */
+ static <V> Collection<? extends Callable<V>> alertingCallables(
+ Collection<? extends Callable<V>> callables,
+ final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+ return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() {
+ @Override
+ public Callable<V> apply(Callable<V> callable) {
+ return alertingCallable(callable, handler);
+ }
+ });
+ }
+}