You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/26 23:00:26 UTC
[36/51] [partial] aurora git commit: Move packages from
com.twitter.common to org.apache.aurora.common
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java
deleted file mode 100644
index 3a30b58..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.callers;
-
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.thrift.TResourceExhaustedException;
-
-/**
-* A caller that will retry calls to the wrapped caller.
-*
-* @author William Farner
-*/
-public class RetryingCaller extends CallerDecorator {
- private static final Logger LOG = Logger.getLogger(RetryingCaller.class.getName());
-
- @VisibleForTesting
- public static final Amount<Long, Time> NONBLOCKING_TIMEOUT = Amount.of(-1L, Time.MILLISECONDS);
-
- private final StatsProvider statsProvider;
- private final String serviceName;
- private final int retries;
- private final ImmutableSet<Class<? extends Exception>> retryableExceptions;
- private final boolean debug;
-
- /**
- * Creates a new retrying caller. The retrying caller will attempt to call invoked methods on the
- * underlying caller at most {@code retries} times. A retry will be performed only when one of
- * the {@code retryableExceptions} is caught.
- *
- * @param decoratedCall The caller to decorate with retries.
- * @param async Whether the caller is asynchronous.
- * @param statsProvider The stat provider to export retry statistics through.
- * @param serviceName The service name that calls are being invoked on.
- * @param retries The maximum number of retries to perform.
- * @param retryableExceptions The exceptions that can be retried.
- * @param debug Whether to include debugging information when retries are being performed.
- */
- public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsProvider,
- String serviceName, int retries, ImmutableSet<Class<? extends Exception>> retryableExceptions,
- boolean debug) {
- super(decoratedCall, async);
- this.statsProvider = statsProvider;
- this.serviceName = serviceName;
- this.retries = retries;
- this.retryableExceptions = retryableExceptions;
- this.debug = debug;
- }
-
- private final LoadingCache<Method, AtomicLong> stats =
- CacheBuilder.newBuilder().build(new CacheLoader<Method, AtomicLong>() {
- @Override public AtomicLong load(Method method) {
- // Thrift does not support overloads - so just the name disambiguates all calls.
- return statsProvider.makeCounter(serviceName + "_" + method.getName() + "_retries");
- }
- });
-
- @Override public Object call(final Method method, final Object[] args,
- @Nullable final AsyncMethodCallback callback,
- @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable {
- final AtomicLong retryCounter = stats.get(method);
- final AtomicInteger attempts = new AtomicInteger();
- final List<Throwable> exceptions = Lists.newArrayList();
-
- final ResultCapture capture = new ResultCapture() {
- @Override public void success() {
- // No-op.
- }
-
- @Override public boolean fail(Throwable t) {
- if (!isRetryable(t)) {
- if (debug) {
- LOG.warning(String.format(
- "Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s",
- t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions)));
- }
-
- return true;
- } else if (attempts.get() >= retries) {
- exceptions.add(t);
-
- if (debug) {
- LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s",
- attempts.get(), t, combineStackTraces(exceptions)));
- }
-
- return true;
- } else {
- exceptions.add(t);
-
- if (isAsync() && attempts.incrementAndGet() <= retries) {
- try {
- retryCounter.incrementAndGet();
- // override connect timeout in ThriftCaller to prevent blocking for a connection
- // for async retries (since this is within the callback in the selector thread)
- invoke(method, args, callback, this, NONBLOCKING_TIMEOUT);
- } catch (Throwable throwable) {
- return fail(throwable);
- }
- }
-
- return false;
- }
- }
- };
-
- boolean continueLoop;
- do {
- try {
- // If this is an async call, the looping will be handled within the capture.
- return invoke(method, args, callback, capture, connectTimeoutOverride);
- } catch (Throwable t) {
- if (!isRetryable(t)) {
- Throwable propagated = t;
-
- if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) {
- // If we've been trucking along through retries that have had remote call failures
- // and we suddenly can't immediately get a connection on the next retry, throw the
- // previous remote call failure - the idea here is that the remote call failure is
- // more interesting than a transient inability to get an immediate connection.
- propagated = exceptions.remove(exceptions.size() - 1);
- }
-
- if (isAsync()) {
- callback.onError((Exception) propagated);
- } else {
- throw propagated;
- }
- }
- }
-
- continueLoop = !isAsync() && attempts.incrementAndGet() <= retries;
- if (continueLoop) retryCounter.incrementAndGet();
- } while (continueLoop);
-
- Throwable lastRetriedException = Iterables.getLast(exceptions);
- if (debug) {
- if (!exceptions.isEmpty()) {
- LOG.warning(
- String.format("Retried %d times, last error: %s, previous exceptions: %s",
- attempts.get(), lastRetriedException, combineStackTraces(exceptions)));
- } else {
- LOG.warning(
- String.format("Retried 1 time, last error: %s", lastRetriedException));
- }
- }
-
- if (!isAsync()) throw lastRetriedException;
- return null;
- }
-
- private boolean isRetryable(Throwable throwable) {
- return isRetryable.getUnchecked(throwable.getClass());
- }
-
- private final LoadingCache<Class<? extends Throwable>, Boolean> isRetryable =
- CacheBuilder.newBuilder().build(new CacheLoader<Class<? extends Throwable>, Boolean>() {
- @Override public Boolean load(Class<? extends Throwable> exceptionClass) {
- return isRetryable(exceptionClass);
- }
- });
-
- private boolean isRetryable(final Class<? extends Throwable> exceptionClass) {
- if (retryableExceptions.contains(exceptionClass)) {
- return true;
- }
- return Iterables.any(retryableExceptions, new Predicate<Class<? extends Exception>>() {
- @Override public boolean apply(Class<? extends Exception> retryableExceptionClass) {
- return retryableExceptionClass.isAssignableFrom(exceptionClass);
- }
- });
- }
-
- private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
-
- private static String combineStackTraces(List<Throwable> exceptions) {
- if (exceptions.isEmpty()) {
- return "none";
- } else {
- return STACK_TRACE_JOINER.join(Iterables.transform(exceptions,
- new Function<Throwable, String>() {
- private int index = 1;
- @Override public String apply(Throwable exception) {
- return String.format("[%d] %s",
- index++, Throwables.getStackTraceAsString(exception));
- }
- }));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java
deleted file mode 100644
index 083a748..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.callers;
-
-import java.lang.reflect.Method;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.annotation.Nullable;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.stats.StatsProvider.RequestTimer;
-import com.twitter.common.thrift.TResourceExhaustedException;
-import com.twitter.common.thrift.TTimeoutException;
-
-/**
- * A caller that exports statistics about calls made to the wrapped caller.
- *
- * @author William Farner
- */
-public class StatTrackingCaller extends CallerDecorator {
-
- private final StatsProvider statsProvider;
- private final String serviceName;
-
- private final LoadingCache<Method, RequestTimer> stats =
- CacheBuilder.newBuilder().build(new CacheLoader<Method, RequestTimer>() {
- @Override public RequestTimer load(Method method) {
- // Thrift does not support overloads - so just the name disambiguates all calls.
- return statsProvider.makeRequestTimer(serviceName + "_" + method.getName());
- }
- });
-
- /**
- * Creates a new stat tracking caller, which will export stats to the given {@link StatsProvider}.
- *
- * @param decoratedCaller The caller to decorate with a deadline.
- * @param async Whether the caller is asynchronous.
- * @param statsProvider The stat provider to export statistics to.
- * @param serviceName The name of the service that methods are being called on.
- */
- public StatTrackingCaller(Caller decoratedCaller, boolean async, StatsProvider statsProvider,
- String serviceName) {
- super(decoratedCaller, async);
-
- this.statsProvider = statsProvider;
- this.serviceName = serviceName;
- }
-
- @Override
- public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
- final RequestTimer requestStats = stats.get(method);
- final long startTime = System.nanoTime();
-
- ResultCapture capture = new ResultCapture() {
- @Override public void success() {
- requestStats.requestComplete(TimeUnit.NANOSECONDS.toMicros(
- System.nanoTime() - startTime));
- }
-
- @Override public boolean fail(Throwable t) {
- // TODO(John Sirois): the ruby client reconnects for timeouts too - this provides a natural
- // backoff mechanism - consider how to plumb something similar.
- if (t instanceof TTimeoutException || t instanceof TimeoutException) {
- requestStats.incTimeouts();
- return true;
- }
-
- // TODO(John Sirois): consider ditching reconnects since its nearly redundant with errors as
- // it stands.
- if (!(t instanceof TResourceExhaustedException)) {
- requestStats.incReconnects();
- }
- // TODO(John Sirois): provide more detailed stats: track counts for distinct exceptions types,
- // track retries-per-method, etc...
- requestStats.incErrors();
- return true;
- }
- };
-
- return invoke(method, args, callback, capture, connectTimeoutOverride);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java
deleted file mode 100644
index 24a10b0..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.callers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.twitter.common.net.pool.Connection;
-import com.twitter.common.net.pool.ObjectPool;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.net.pool.ResourceExhaustedException;
-import com.twitter.common.thrift.TResourceExhaustedException;
-import com.twitter.common.thrift.TTimeoutException;
-import com.twitter.common.net.loadbalancing.RequestTracker;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.transport.TTransport;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-/**
- * A caller that issues calls to a target that is assumed to be a client to a thrift service.
- *
- * @author William Farner
- */
-public class ThriftCaller<T> implements Caller {
- private static final Logger LOG = Logger.getLogger(ThriftCaller.class.getName());
-
- private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
- private final RequestTracker<InetSocketAddress> requestTracker;
- private final Function<TTransport, T> clientFactory;
- private final Amount<Long, Time> timeout;
- private final boolean debug;
-
- /**
- * Creates a new thrift caller.
- *
- * @param connectionPool The connection pool to use.
- * @param requestTracker The request tracker to nofify of request results.
- * @param clientFactory Factory to use for building client object instances.
- * @param timeout The timeout to use when requesting objects from the connection pool.
- * @param debug Whether to use the caller in debug mode.
- */
- public ThriftCaller(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
- RequestTracker<InetSocketAddress> requestTracker, Function<TTransport, T> clientFactory,
- Amount<Long, Time> timeout, boolean debug) {
-
- this.connectionPool = connectionPool;
- this.requestTracker = requestTracker;
- this.clientFactory = clientFactory;
- this.timeout = timeout;
- this.debug = debug;
- }
-
- @Override
- public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
- @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
-
- final Connection<TTransport, InetSocketAddress> connection = getConnection(connectTimeoutOverride);
- final long startNanos = System.nanoTime();
-
- ResultCapture capture = new ResultCapture() {
- @Override public void success() {
- try {
- requestTracker.requestResult(connection.getEndpoint(),
- RequestTracker.RequestResult.SUCCESS, System.nanoTime() - startNanos);
- } finally {
- connectionPool.release(connection);
- }
- }
-
- @Override public boolean fail(Throwable t) {
- if (debug) {
- LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, t));
- }
-
- try {
- requestTracker.requestResult(connection.getEndpoint(),
- RequestTracker.RequestResult.FAILED, System.nanoTime() - startNanos);
- } finally {
- connectionPool.remove(connection);
- }
- return true;
- }
- };
-
- return invokeMethod(clientFactory.apply(connection.get()), method, args, callback, capture);
- }
-
- private static Object invokeMethod(Object target, Method method, Object[] args,
- AsyncMethodCallback callback, final ResultCapture capture) throws Throwable {
-
- // Swap the wrapped callback out for ours.
- if (callback != null) {
- callback = new WrappedMethodCallback(callback, capture);
-
- List<Object> argsList = Lists.newArrayList(args);
- argsList.add(callback);
- args = argsList.toArray();
- }
-
- try {
- Object result = method.invoke(target, args);
- if (callback == null) capture.success();
-
- return result;
- } catch (InvocationTargetException t) {
- // We allow this one to go to both sync and async captures.
- if (callback != null) {
- callback.onError((Exception) t.getCause());
- return null;
- } else {
- capture.fail(t.getCause());
- throw t.getCause();
- }
- }
- }
-
- private Connection<TTransport, InetSocketAddress> getConnection(
- Amount<Long, Time> connectTimeoutOverride)
- throws TResourceExhaustedException, TTimeoutException {
- try {
- Connection<TTransport, InetSocketAddress> connection;
- if (connectTimeoutOverride != null) {
- connection = connectionPool.get(connectTimeoutOverride);
- } else {
- connection = (timeout.getValue() > 0)
- ? connectionPool.get(timeout) : connectionPool.get();
- }
-
- if (connection == null) {
- throw new TResourceExhaustedException("no connection was available");
- }
- return connection;
- } catch (ResourceExhaustedException e) {
- throw new TResourceExhaustedException(e);
- } catch (TimeoutException e) {
- throw new TTimeoutException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java
deleted file mode 100644
index 5c4d841..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.monitoring;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.net.monitoring.ConnectionMonitor;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.net.InetSocketAddress;
-
-/**
- * Extension of TNonblockingServerSocket that allows for tracking of connected clients.
- *
- * @author William Farner
- */
-public class TMonitoredNonblockingServerSocket extends TNonblockingServerSocket {
- private final ConnectionMonitor monitor;
-
- public TMonitoredNonblockingServerSocket(int port, ConnectionMonitor monitor)
- throws TTransportException {
- super(port);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredNonblockingServerSocket(int port, int clientTimeout, ConnectionMonitor monitor)
- throws TTransportException {
- super(port, clientTimeout);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, ConnectionMonitor monitor)
- throws TTransportException {
- super(bindAddr);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout,
- ConnectionMonitor monitor) throws TTransportException {
- super(bindAddr, clientTimeout);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- @Override
- protected TNonblockingSocket acceptImpl() throws TTransportException {
- /* TODO(William Farner): Finish implementing...may require an object proxy.
- final TNonblockingSocket socket = super.acceptImpl();
-
- TNonblockingSocket wrappedSocket = new TNonblockingSocket(socket.get) {
- @Override public void close() {
- super.close();
- monitor.disconnected(this);
- }
- };
-
- monitor.connected(wrappedSocket, socket.getSocket().getInetAddress());
-
- return wrappedSocket;
-
- */
- return super.acceptImpl();
- }
-
- @Override
- public void close() {
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java
deleted file mode 100644
index bf0c6d3..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.monitoring;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.net.loadbalancing.RequestTracker;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-
-import java.net.InetSocketAddress;
-
-import static com.twitter.common.net.loadbalancing.RequestTracker.RequestResult.*;
-
-/**
- * A TProcessor that joins a wrapped TProcessor with a monitor.
- *
- * @author William Farner
- */
-public class TMonitoredProcessor implements TProcessor {
- private final TProcessor wrapped;
- private final TMonitoredServerSocket monitoredServerSocket;
- private final RequestTracker<InetSocketAddress> monitor;
-
- public TMonitoredProcessor(TProcessor wrapped, TMonitoredServerSocket monitoredServerSocket,
- RequestTracker<InetSocketAddress> monitor) {
- this.wrapped = Preconditions.checkNotNull(wrapped);
- this.monitoredServerSocket = Preconditions.checkNotNull(monitoredServerSocket);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- @Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
- long startNanos = System.nanoTime();
- boolean exceptionThrown = false;
- try {
- return wrapped.process(in, out);
- } catch (TException e) {
- exceptionThrown = true;
- throw e;
- } finally {
- InetSocketAddress address = monitoredServerSocket.getAddress((TSocket) in.getTransport());
- Preconditions.checkState(address != null,
- "Address unknown for transport " + in.getTransport());
-
- monitor.requestResult(address, exceptionThrown ? FAILED : SUCCESS,
- System.nanoTime() - startNanos);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java
deleted file mode 100644
index 38b3c73..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.monitoring;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.twitter.common.net.monitoring.ConnectionMonitor;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Extension of TServerSocket that allows for tracking of connected clients.
- *
- * @author William Farner
- */
-public class TMonitoredServerSocket extends TServerSocket {
- private ConnectionMonitor<InetSocketAddress> monitor;
-
- public TMonitoredServerSocket(ServerSocket serverSocket,
- ConnectionMonitor<InetSocketAddress> monitor) {
- super(serverSocket);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredServerSocket(ServerSocket serverSocket, int clientTimeout,
- ConnectionMonitor<InetSocketAddress> monitor) {
- super(serverSocket, clientTimeout);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredServerSocket(int port, ConnectionMonitor<InetSocketAddress> monitor)
- throws TTransportException {
- super(port);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredServerSocket(int port, int clientTimeout,
- ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException {
- super(port, clientTimeout);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredServerSocket(InetSocketAddress bindAddr,
- ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException {
- super(bindAddr);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- public TMonitoredServerSocket(InetSocketAddress bindAddr, int clientTimeout,
- ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException {
- super(bindAddr, clientTimeout);
- this.monitor = Preconditions.checkNotNull(monitor);
- }
-
- private final Map<TSocket, InetSocketAddress> addressMap =
- Collections.synchronizedMap(Maps.<TSocket, InetSocketAddress>newHashMap());
-
- public InetSocketAddress getAddress(TSocket socket) {
- return addressMap.get(socket);
- }
-
- @Override
- protected TSocket acceptImpl() throws TTransportException {
- final TSocket socket = super.acceptImpl();
- final InetSocketAddress remoteAddress =
- (InetSocketAddress) socket.getSocket().getRemoteSocketAddress();
-
- TSocket monitoredSocket = new TSocket(socket.getSocket()) {
- boolean closed = false;
-
- @Override public void close() {
- try {
- super.close();
- } finally {
- if (!closed) {
- monitor.released(remoteAddress);
- addressMap.remove(this);
- }
- closed = true;
- }
- }
- };
-
- addressMap.put(monitoredSocket, remoteAddress);
-
- monitor.connected(remoteAddress);
- return monitoredSocket;
- }
-
- @Override
- public void close() {
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java b/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java
deleted file mode 100644
index 330403b..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.testing;
-
-import org.apache.thrift.transport.TSocket;
-
-/**
- * @author William Farner
- */
-public class MockTSocket extends TSocket {
- public static final String HOST = "dummyHost";
- public static final int PORT = 1000;
-
- private boolean connected = false;
-
- public MockTSocket() {
- super(HOST, PORT);
- }
-
- @Override
- public void open() {
- connected = true;
- // TODO(William Farner): Allow for failure injection here by throwing TTransportException.
- }
-
- @Override
- public boolean isOpen() {
- return connected;
- }
-
- public void close() {
- connected = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java b/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java
deleted file mode 100644
index 5225060..0000000
--- a/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.thrift.testing;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TBaseHelper;
-import org.apache.thrift.TException;
-import org.apache.thrift.TFieldIdEnum;
-import org.apache.thrift.protocol.TField;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TStruct;
-import org.apache.thrift.protocol.TType;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Hand-coded thrift types for use in tests.
- *
- * @author John Sirois
- */
-public class TestThriftTypes {
- public static class Field implements TFieldIdEnum {
- private static final Map<Short, Field> FIELDS_BY_ID = Maps.newHashMap();
- public static Field forId(int id) {
- Field field = FIELDS_BY_ID.get((short) id);
- Preconditions.checkArgument(field != null, "No Field with id: %s", id);
- return field;
- }
-
- public static final Field NAME = new Field((short) 0, "name");
- public static final Field VALUE = new Field((short) 1, "value");
-
- private final short fieldId;
- private final String fieldName;
-
- private Field(short fieldId, String fieldName) {
- this.fieldId = fieldId;
- this.fieldName = fieldName;
- FIELDS_BY_ID.put(fieldId, this);
- }
-
- @Override
- public short getThriftFieldId() {
- return fieldId;
- }
-
- @Override
- public String getFieldName() {
- return fieldName;
- }
- }
-
- public static class Struct implements TBase<Struct, Field> {
- private final Map<Field, Object> fields = Maps.newHashMap();
-
- public Struct() {}
-
- public Struct(String name, String value) {
- fields.put(Field.NAME, name);
- fields.put(Field.VALUE, value);
- }
-
- public String getName() {
- Object name = getFieldValue(Field.NAME);
- return name == null ? null : (String) name;
- }
-
- public String getValue() {
- Object value = getFieldValue(Field.VALUE);
- return value == null ? null : (String) value;
- }
-
- @Override
- public void read(TProtocol tProtocol) throws TException {
- tProtocol.readStructBegin();
- TField field;
- while((field = tProtocol.readFieldBegin()).type != TType.STOP) {
- fields.put(fieldForId(field.id), tProtocol.readString());
- tProtocol.readFieldEnd();
- }
- tProtocol.readStructEnd();
- }
-
- @Override
- public void write(TProtocol tProtocol) throws TException {
- tProtocol.writeStructBegin(new TStruct("Field"));
- for (Entry<Field, Object> entry : fields.entrySet()) {
- Field field = entry.getKey();
- tProtocol.writeFieldBegin(
- new TField(field.getFieldName(), TType.STRING, field.getThriftFieldId()));
- tProtocol.writeString(entry.getValue().toString());
- tProtocol.writeFieldEnd();
- }
- tProtocol.writeFieldStop();
- tProtocol.writeStructEnd();
- }
-
- @Override
- public boolean isSet(Field field) {
- return fields.containsKey(field);
- }
-
- @Override
- public Object getFieldValue(Field field) {
- return fields.get(field);
- }
-
- @Override
- public void setFieldValue(Field field, Object o) {
- fields.put(field, o);
- }
-
- @Override
- public TBase<Struct, Field> deepCopy() {
- Struct struct = new Struct();
- struct.fields.putAll(fields);
- return struct;
- }
-
- @Override
- public int compareTo(Struct other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison;
-
- lastComparison = Integer.valueOf(fields.size()).compareTo(other.fields.size());
- if (lastComparison != 0) {
- return lastComparison;
- }
-
- for (Map.Entry<Field, Object> entry : fields.entrySet()) {
- Field field = entry.getKey();
- lastComparison = Boolean.TRUE.compareTo(other.isSet(field));
- if (lastComparison != 0) {
- return lastComparison;
- }
- lastComparison = TBaseHelper.compareTo(entry.getValue(), other.getFieldValue(field));
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- @Override
- public void clear() {
- fields.clear();
- }
-
- @Override
- public Field fieldForId(int fieldId) {
- return Field.forId(fieldId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffDecider.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/BackoffDecider.java b/commons/src/main/java/com/twitter/common/util/BackoffDecider.java
deleted file mode 100644
index 117970a..0000000
--- a/commons/src/main/java/com/twitter/common/util/BackoffDecider.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-
-import javax.annotation.Nullable;
-import java.util.Deque;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-/**
- * Handles logic for deciding whether to back off from calls to a backend.
- *
- * This works by offering a guard method {@link #shouldBackOff()}, which instructs the caller
- * whether they should avoid making the call. The backoff logic will maintain statistics about
- * the failure rate, and push into a backoff state (silent period) when the failure rate exceeds
- * the configured threshold. At the end of the quiet period, a recovery state will be entered,
- * during which the decider will allow traffic to ramp back up to full capacity.
- *
- * The expected use case looks something like this:
- *
- * <pre>
- * void sendRequestGuarded() {
- * if (!decider.shouldBackOff()) {
- * boolean success = sendRequestUnguarded();
- * if (success) {
- * decider.addSuccess();
- * } else {
- * decider.addFailure();
- * }
- * }
- * }
- * </pre>
- *
- * @author William Farner
- */
-public class BackoffDecider {
- private static final Logger LOG = Logger.getLogger(BackoffDecider.class.getName());
-
- // The group that this decider is a part of.
- private final Iterable<BackoffDecider> deciderGroup;
-
- private final TimedStateMachine stateMachine;
-
- private final String name;
-
- private final double toleratedFailureRate;
-
- @VisibleForTesting final RequestWindow requests;
-
- // Used to calculate backoff durations when in backoff state.
- private final BackoffStrategy strategy;
-
- private final Amount<Long, Time> recoveryPeriod;
- private long previousBackoffPeriodNs = 0;
-
- // Used for random selection during recovery period.
- private final Random random;
-
- private final Clock clock;
- private final AtomicLong backoffs;
- private final RecoveryType recoveryType;
-
- /**
- * Different types of recovery mechanisms to use after exiting the backoff state.
- */
- public static enum RecoveryType {
- // Randomly allows traffic to flow through, with a linearly-ascending probability.
- RANDOM_LINEAR,
- // Allows full traffic capacity to flow during the recovery period.
- FULL_CAPACITY
- }
-
- private BackoffDecider(String name, int seedSize, double toleratedFailureRate,
- @Nullable Iterable<BackoffDecider> deciderGroup, BackoffStrategy strategy,
- @Nullable Amount<Long, Time> recoveryPeriod,
- long requestWindowNs, int numBuckets, RecoveryType recoveryType, StatsProvider statsProvider,
- Random random, Clock clock) {
- MorePreconditions.checkNotBlank(name);
- Preconditions.checkArgument(seedSize > 0);
- Preconditions.checkArgument(toleratedFailureRate >= 0 && toleratedFailureRate < 1.0);
- Preconditions.checkNotNull(strategy);
- Preconditions.checkArgument(recoveryPeriod == null || recoveryPeriod.getValue() > 0);
- Preconditions.checkArgument(requestWindowNs > 0);
- Preconditions.checkArgument(numBuckets > 0);
- Preconditions.checkNotNull(recoveryType);
- Preconditions.checkNotNull(statsProvider);
- Preconditions.checkNotNull(random);
- Preconditions.checkNotNull(clock);
-
- this.name = name;
- this.toleratedFailureRate = toleratedFailureRate;
- this.deciderGroup = deciderGroup;
- this.strategy = strategy;
- this.recoveryPeriod = recoveryPeriod;
- this.recoveryType = recoveryType;
-
- this.random = random;
- this.clock = clock;
-
- this.backoffs = statsProvider.makeCounter(name + "_backoffs");
- this.requests = new RequestWindow(requestWindowNs, numBuckets, seedSize);
-
- this.stateMachine = new TimedStateMachine(name);
- }
-
- /**
- * Checks whether the caller should back off and if not then returns immediately; otherwise the
- * method blocks until it is safe for the caller to proceed without backing off further based on
- * all data available at the time of this call.
- *
- * @return the amount of time in nanoseconds spent awaiting backoff
- * @throws InterruptedException if the calling thread was interrupted while backing off
- */
- public long awaitBackoff() throws InterruptedException {
- if (shouldBackOff()) {
- long backoffTimeMs = stateMachine.getStateRemainingMs();
-
- if (backoffTimeMs > 0) {
- // Wait without holding any external locks.
- Object waitCondition = new Object();
- synchronized (waitCondition) {
- waitCondition.wait(backoffTimeMs);
- }
- return backoffTimeMs;
- }
- }
- return 0;
- }
-
- /**
- * Checks whether this decider instructs the caller that it should back off from the associated
- * backend. This is determined based on the response history for the backend as well as the
- * backoff state of the decider group (if configured).
- *
- * @return {@code true} if the decider is in backoff mode, otherwise {@code false}.
- */
- @SuppressWarnings("fallthrough")
- public synchronized boolean shouldBackOff() {
-
- boolean preventRequest;
- switch (stateMachine.getState()) {
- case NORMAL:
- preventRequest = false;
- break;
-
- case BACKOFF:
- if (deciderGroup != null && allOthersBackingOff()) {
- LOG.info("Backends in group with " + name + " down, forcing back up.");
- stateMachine.transitionUnbounded(State.FORCED_NORMAL);
- return false;
- } else if (stateMachine.isStateExpired()) {
- long recoveryPeriodNs = recoveryPeriod == null ? stateMachine.getStateDurationNs()
- : recoveryPeriod.as(Time.NANOSECONDS);
-
- // The silent period has expired, move to recovery state (and drop to its case block).
- stateMachine.transition(State.RECOVERY, recoveryPeriodNs);
- LOG.info(String.format("%s recovering for %s ms", name,
- Amount.of(recoveryPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS)));
- } else {
- preventRequest = true;
- break;
- }
-
- case RECOVERY:
- if (deciderGroup != null && allOthersBackingOff()) {
- return false;
- } else if (stateMachine.isStateExpired()) {
- // We have reached the end of the recovery period, return to normal.
- stateMachine.transitionUnbounded(State.NORMAL);
- previousBackoffPeriodNs = 0;
- preventRequest = false;
- } else {
- switch (recoveryType) {
- case RANDOM_LINEAR:
- // In the recovery period, allow request rate to return linearly to the full load.
- preventRequest = random.nextDouble() > stateMachine.getStateFractionComplete();
- break;
- case FULL_CAPACITY:
- preventRequest = false;
- break;
- default:
- throw new IllegalStateException("Unhandled recovery type " + recoveryType);
- }
- }
-
- break;
-
- case FORCED_NORMAL:
- if (!allOthersBackingOff()) {
- // We were in forced normal state, but at least one other backend is up, try recovering.
- stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs());
- preventRequest = false;
- } else {
- preventRequest = true;
- }
-
- break;
-
- default:
- LOG.severe("Unrecognized state: " + stateMachine.getState());
- preventRequest = false;
- }
-
- if (preventRequest) {
- backoffs.incrementAndGet();
- }
- return preventRequest;
- }
-
- private boolean allOthersBackingOff() {
- // Search for another decider that is not backing off.
- for (BackoffDecider decider : deciderGroup) {
- State deciderState = decider.stateMachine.getState();
- boolean inBackoffState = deciderState == State.BACKOFF || deciderState == State.FORCED_NORMAL;
- if ((decider != this) && !inBackoffState) {
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Records a failed request to the backend.
- */
- public void addFailure() {
- addResult(false);
- }
-
- /**
- * Records a successful request to the backend.
- */
- public void addSuccess() {
- addResult(true);
- }
-
- /**
- * Transitions the state to BACKOFF and logs a message appropriately if it is doing so because of high fail rate
- * or by force.
- *
- * @param failRate rate of request failures on this host.
- * @param force if {@code true}, forces the transition to BACKOFF. Typically used in cases when the host
- * was not found to be alive by LiveHostChecker.
- */
- public synchronized void transitionToBackOff(double failRate, boolean force) {
- long prevBackoffMs = Amount.of(previousBackoffPeriodNs, Time.NANOSECONDS)
- .as(Time.MILLISECONDS);
-
- long backoffPeriodNs = Amount.of(strategy.calculateBackoffMs(prevBackoffMs), Time.MILLISECONDS)
- .as(Time.NANOSECONDS);
- if (!force) {
- LOG.info(String.format("%s failure rate at %g, backing off for %s ms", name,failRate,
- Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS)));
- } else {
- LOG.info(String.format("%s forced to back off for %s ms", name,
- Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS)));
- }
- stateMachine.transition(State.BACKOFF, backoffPeriodNs);
- previousBackoffPeriodNs = backoffPeriodNs;
- }
-
- @SuppressWarnings("fallthrough")
- private synchronized void addResult(boolean success) {
- // Disallow statistics updating if we are in backoff state.
- if (stateMachine.getState() == State.BACKOFF) {
- return;
- }
-
- requests.addResult(success);
- double failRate = requests.getFailureRate();
- boolean highFailRate = requests.isSeeded() && (failRate > toleratedFailureRate);
-
- switch (stateMachine.getState()) {
- case NORMAL:
- if (!highFailRate) {
- // No-op.
- break;
- } else {
- // Artificially move into recovery state (by falling through) with a zero-duration
- // time window, to trigger the initial backoff period.
- stateMachine.setStateDurationNs(0);
- }
-
- case RECOVERY:
- if (highFailRate) {
- // We were trying to recover, and the failure rate is still too high. Go back to
- // backoff state for a longer duration.
- requests.reset();
-
- // transition the state machine to BACKOFF state, due to high fail rate.
- transitionToBackOff(failRate, false);
- } else {
- // Do nothing. We only exit the recovery state by expiration.
- }
- break;
-
- case FORCED_NORMAL:
- if (!highFailRate) {
- stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs());
- }
- break;
-
- case BACKOFF:
- throw new IllegalStateException("Backoff state may only be exited by expiration.");
- }
- }
-
- /**
- * Creates a builder object.
- *
- * @param name Name for the backoff decider to build.
- * @return A builder.
- */
- public static Builder builder(String name) {
- return new Builder(name);
- }
-
- /**
- * Builder class to configure a BackoffDecider.
- *
- * The builder allows for customization of many different parameters to the BackoffDecider, while
- * defining defaults wherever possible. The following defaults are used:
- *
- * <ul>
- * <li> seed size - The number of requests to accumulate before a backoff will be considered.
- * 100
- *
- * <li> tolerated failure rate - Maximum failure rate before backing off.
- * 0.5
- *
- * <li> decider group - Group this decider is a part of, to prevent complete backend failure.
- * null (disabled)
- *
- * <li> strategy - Used to calculate subsequent backoff durations.
- * TruncatedBinaryBackoff, initial 100 ms, max 10s
- *
- * <li> recovery period - Fixed recovery period while ramping traffic back to full capacity..
- * null (use last backoff period)
- *
- * <li> request window - Duration of the sliding window of requests to track statistics for.
- * 10 seconds
- *
- * <li> num buckets - The number of time slices within the request window, for stat expiration.
- * The sliding request window advances in intervals of request window / num buckets.
- * 100
- *
- * <li> recovery type - Defines behavior during the recovery period, and how traffic is permitted.
- * random linear
- *
- * <li> stat provider - The stats provider to export statistics to.
- * Stats.STATS_PROVIDER
- * </ul>
- *
- */
- public static class Builder {
- private String name;
- private int seedSize = 100;
- private double toleratedFailureRate = 0.5;
- private Set<BackoffDecider> deciderGroup = null;
- private BackoffStrategy strategy = new TruncatedBinaryBackoff(
- Amount.of(100L, Time.MILLISECONDS), Amount.of(10L, Time.SECONDS));
- private Amount<Long, Time> recoveryPeriod = null;
- private long requestWindowNs = Amount.of(10L, Time.SECONDS).as(Time.NANOSECONDS);
- private int numBuckets = 100;
- private RecoveryType recoveryType = RecoveryType.RANDOM_LINEAR;
- private StatsProvider statsProvider = Stats.STATS_PROVIDER;
- private Random random = Random.Util.newDefaultRandom();
- private Clock clock = Clock.SYSTEM_CLOCK;
-
- Builder(String name) {
- this.name = name;
- }
-
- /**
- * Sets the number of requests that must be accumulated before the error rate will be
- * calculated. This improves the genesis problem where the first few requests are errors,
- * causing flapping in and out of backoff state.
- *
- * @param seedSize Request seed size.
- * @return A reference to the builder.
- */
- public Builder withSeedSize(int seedSize) {
- this.seedSize = seedSize;
- return this;
- }
-
- /**
- * Sets the tolerated failure rate for the decider. If the rate is exceeded for the time
- * window, the decider begins backing off.
- *
- * @param toleratedRate The tolerated failure rate (between 0.0 and 1.0, exclusive).
- * @return A reference to the builder.
- */
- public Builder withTolerateFailureRate(double toleratedRate) {
- this.toleratedFailureRate = toleratedRate;
- return this;
- }
-
- /**
- * Makes the decider a part of a group. When a decider is a part of a group, it will monitor
- * the other deciders to ensure that all deciders do not back off at once.
- *
- * @param deciderGroup Group to make this decider a part of. More deciders may be added to the
- * group after this call is made.
- * @return A reference to the builder.
- */
- public Builder groupWith(Set<BackoffDecider> deciderGroup) {
- this.deciderGroup = deciderGroup;
- return this;
- }
-
- /**
- * Overrides the default backoff strategy.
- *
- * @param strategy Backoff strategy to use.
- * @return A reference to the builder.
- */
- public Builder withStrategy(BackoffStrategy strategy) {
- this.strategy = strategy;
- return this;
- }
-
- /**
- * Overrides the default recovery period behavior. By default, the recovery period is equal
- * to the previous backoff period (which is equivalent to setting the recovery period to null
- * here). A non-null value here will assign a fixed recovery period.
- *
- * @param recoveryPeriod Fixed recovery period.
- * @return A reference to the builder.
- */
- public Builder withRecoveryPeriod(@Nullable Amount<Long, Time> recoveryPeriod) {
- this.recoveryPeriod = recoveryPeriod;
- return this;
- }
-
- /**
- * Sets the time window over which to analyze failures. Beyond the time window, request history
- * is discarded (and ignored).
- *
- * @param requestWindow The analysis time window.
- * @return A reference to the builder.
- */
- public Builder withRequestWindow(Amount<Long, Time> requestWindow) {
- this.requestWindowNs = requestWindow.as(Time.NANOSECONDS);
- return this;
- }
-
- /**
- * Sets the number of time slices that the decider will use to partition aggregate statistics.
- *
- * @param numBuckets Bucket count.
- * @return A reference to the builder.
- */
- public Builder withBucketCount(int numBuckets) {
- this.numBuckets = numBuckets;
- return this;
- }
-
- /**
- * Sets the recovery mechanism to use when in the recovery period.
- *
- * @param recoveryType The recovery mechanism to use.
- * @return A reference to the builder.
- */
- public Builder withRecoveryType(RecoveryType recoveryType) {
- this.recoveryType = recoveryType;
- return this;
- }
-
- /**
- * Sets the stats provider that statistics should be exported to.
- *
- * @param statsProvider Stats provider to use.
- * @return A reference to the builder.
- */
- public Builder withStatsProvider(StatsProvider statsProvider) {
- this.statsProvider = statsProvider;
- return this;
- }
-
- @VisibleForTesting public Builder withRandom(Random random) {
- this.random = random;
- return this;
- }
-
- @VisibleForTesting public Builder withClock(Clock clock) {
- this.clock = clock;
- return this;
- }
-
- /**
- * Gets a reference to the built decider object.
- * @return A decider object.
- */
- public BackoffDecider build() {
- BackoffDecider decider = new BackoffDecider(name, seedSize, toleratedFailureRate,
- deciderGroup, strategy, recoveryPeriod, requestWindowNs, numBuckets, recoveryType,
- statsProvider, random, clock);
- if (deciderGroup != null) deciderGroup.add(decider);
- return decider;
- }
- }
-
- private class TimeSlice {
- int requestCount = 0;
- int failureCount = 0;
- final long bucketStartNs;
-
- public TimeSlice() {
- bucketStartNs = clock.nowNanos();
- }
- }
-
- class RequestWindow {
- // These store the sum of the respective fields contained within buckets. Doing so removes the
- // need to accumulate the counts within the buckets every time the backoff state is
- // recalculated.
- @VisibleForTesting long totalRequests = 0;
- @VisibleForTesting long totalFailures = 0;
-
- private final long durationNs;
- private final long bucketLengthNs;
- private final int seedSize;
-
- // Stores aggregate request/failure counts for time slices.
- private final Deque<TimeSlice> buckets = Lists.newLinkedList();
-
- RequestWindow(long durationNs, int bucketCount, int seedSize) {
- this.durationNs = durationNs;
- this.bucketLengthNs = durationNs / bucketCount;
- buckets.addFirst(new TimeSlice());
- this.seedSize = seedSize;
- }
-
- void reset() {
- totalRequests = 0;
- totalFailures = 0;
- buckets.clear();
- buckets.addFirst(new TimeSlice());
- }
-
- void addResult(boolean success) {
- maybeShuffleBuckets();
- buckets.peekFirst().requestCount++;
- totalRequests++;
-
- if (!success) {
- buckets.peekFirst().failureCount++;
- totalFailures++;
- }
- }
-
- void maybeShuffleBuckets() {
- // Check if the first bucket is still relevant.
- if (clock.nowNanos() - buckets.peekFirst().bucketStartNs >= bucketLengthNs) {
-
- // Remove old buckets.
- while (!buckets.isEmpty()
- && buckets.peekLast().bucketStartNs < clock.nowNanos() - durationNs) {
- TimeSlice removed = buckets.removeLast();
- totalRequests -= removed.requestCount;
- totalFailures -= removed.failureCount;
- }
-
- buckets.addFirst(new TimeSlice());
- }
- }
-
- boolean isSeeded() {
- return totalRequests >= seedSize;
- }
-
- double getFailureRate() {
- return totalRequests == 0 ? 0 : ((double) totalFailures) / totalRequests;
- }
- }
-
- private static enum State {
- NORMAL, // All requests are being permitted.
- BACKOFF, // Quiet period while waiting for backend to recover/improve.
- RECOVERY, // Ramping period where an ascending fraction of requests is being permitted.
- FORCED_NORMAL // All other backends in the group are backing off, so this one is forced normal.
- }
- private class TimedStateMachine {
- final StateMachine<State> stateMachine;
-
- private long stateEndNs;
- private long stateDurationNs;
-
- TimedStateMachine(String name) {
- stateMachine = StateMachine.<State>builder(name + "_backoff_state_machine")
- .addState(State.NORMAL, State.BACKOFF, State.FORCED_NORMAL)
- .addState(State.BACKOFF, State.RECOVERY, State.FORCED_NORMAL)
- .addState(State.RECOVERY, State.NORMAL, State.BACKOFF, State.FORCED_NORMAL)
- .addState(State.FORCED_NORMAL, State.RECOVERY)
- .initialState(State.NORMAL)
- .build();
- }
-
- State getState() {
- return stateMachine.getState();
- }
-
- void transitionUnbounded(State state) {
- stateMachine.transition(state);
- }
-
- void transition(State state, long durationNs) {
- transitionUnbounded(state);
- this.stateEndNs = clock.nowNanos() + durationNs;
- this.stateDurationNs = durationNs;
- }
-
- long getStateDurationNs() {
- return stateDurationNs;
- }
-
- long getStateDurationMs() {
- return Amount.of(stateDurationNs, Time.NANOSECONDS).as(Time.MILLISECONDS);
- }
-
- void setStateDurationNs(long stateDurationNs) {
- this.stateDurationNs = stateDurationNs;
- }
-
- long getStateRemainingNs() {
- return stateEndNs - clock.nowNanos();
- }
-
- long getStateRemainingMs() {
- return Amount.of(getStateRemainingNs(), Time.NANOSECONDS).as(Time.MILLISECONDS);
- }
-
- double getStateFractionComplete() {
- return 1.0 - ((double) getStateRemainingNs()) / stateDurationNs;
- }
-
- boolean isStateExpired() {
- return clock.nowNanos() > stateEndNs;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/BackoffHelper.java b/commons/src/main/java/com/twitter/common/util/BackoffHelper.java
deleted file mode 100644
index 7a2023a..0000000
--- a/commons/src/main/java/com/twitter/common/util/BackoffHelper.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.twitter.common.base.ExceptionalSupplier;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-import java.util.logging.Logger;
-
-/**
- * A utility for dealing with backoffs of retryable actions.
- *
- * <p>TODO(John Sirois): investigate synergies with BackoffDecider.
- *
- * @author John Sirois
- */
-public class BackoffHelper {
- private static final Logger LOG = Logger.getLogger(BackoffHelper.class.getName());
-
- private static final Amount<Long,Time> DEFAULT_INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS);
- private static final Amount<Long,Time> DEFAULT_MAX_BACKOFF = Amount.of(1L, Time.MINUTES);
-
- private final Clock clock;
- private final BackoffStrategy backoffStrategy;
-
- /**
- * Creates a new BackoffHelper that uses truncated binary backoff starting at a 1 second backoff
- * and maxing out at a 1 minute backoff.
- */
- public BackoffHelper() {
- this(DEFAULT_INITIAL_BACKOFF, DEFAULT_MAX_BACKOFF);
- }
-
- /**
- * Creates a new BackoffHelper that uses truncated binary backoff starting at the given
- * {@code initialBackoff} and maxing out at the given {@code maxBackoff}.
- *
- * @param initialBackoff the initial amount of time to back off
- * @param maxBackoff the maximum amount of time to back off
- */
- public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) {
- this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff));
- }
-
- /**
- * Creates a new BackoffHelper that uses truncated binary backoff starting at the given
- * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. This will either:
- * <ul>
- * <li>{@code stopAtMax == true} : throw {@code BackoffExpiredException} when maxBackoff is
- * reached</li>
- * <li>{@code stopAtMax == false} : continue backing off with maxBackoff</li>
- * </ul>
- *
- * @param initialBackoff the initial amount of time to back off
- * @param maxBackoff the maximum amount of time to back off
- * @param stopAtMax if true, this will throw {@code BackoffStoppedException} when the max backoff is
- * reached
- */
- public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff,
- boolean stopAtMax) {
- this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff, stopAtMax));
- }
-
- /**
- * Creates a BackoffHelper that uses the given {@code backoffStrategy} to calculate backoffs
- * between retries.
- *
- * @param backoffStrategy the backoff strategy to use
- */
- public BackoffHelper(BackoffStrategy backoffStrategy) {
- this(Clock.SYSTEM_CLOCK, backoffStrategy);
- }
-
- @VisibleForTesting BackoffHelper(Clock clock, BackoffStrategy backoffStrategy) {
- this.clock = Preconditions.checkNotNull(clock);
- this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy);
- }
-
- /**
- * Executes the given task using the configured backoff strategy until the task succeeds as
- * indicated by returning {@code true}.
- *
- * @param task the retryable task to execute until success
- * @throws InterruptedException if interrupted while waiting for the task to execute successfully
- * @throws BackoffStoppedException if the backoff stopped unsuccessfully
- * @throws E if the task throws
- */
- public <E extends Exception> void doUntilSuccess(final ExceptionalSupplier<Boolean, E> task)
- throws InterruptedException, BackoffStoppedException, E {
- doUntilResult(new ExceptionalSupplier<Boolean, E>() {
- @Override public Boolean get() throws E {
- Boolean result = task.get();
- return Boolean.TRUE.equals(result) ? result : null;
- }
- });
- }
-
- /**
- * Executes the given task using the configured backoff strategy until the task succeeds as
- * indicated by returning a non-null value.
- *
- * @param task the retryable task to execute until success
- * @return the result of the successfully executed task
- * @throws InterruptedException if interrupted while waiting for the task to execute successfully
- * @throws BackoffStoppedException if the backoff stopped unsuccessfully
- * @throws E if the task throws
- */
- public <T, E extends Exception> T doUntilResult(ExceptionalSupplier<T, E> task)
- throws InterruptedException, BackoffStoppedException, E {
- T result = task.get(); // give an immediate try
- return (result != null) ? result : retryWork(task);
- }
-
- private <T, E extends Exception> T retryWork(ExceptionalSupplier<T, E> work)
- throws E, InterruptedException, BackoffStoppedException {
- long currentBackoffMs = 0;
- while (backoffStrategy.shouldContinue(currentBackoffMs)) {
- currentBackoffMs = backoffStrategy.calculateBackoffMs(currentBackoffMs);
- LOG.fine("Operation failed, backing off for " + currentBackoffMs + "ms");
- clock.waitFor(currentBackoffMs);
-
- T result = work.get();
- if (result != null) {
- return result;
- }
- }
- throw new BackoffStoppedException(String.format("Backoff stopped without succeeding."));
- }
-
- /**
- * Occurs after the backoff strategy should stop.
- */
- public static class BackoffStoppedException extends RuntimeException {
- public BackoffStoppedException(String msg) {
- super(msg);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java b/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java
deleted file mode 100644
index a90d3f6..0000000
--- a/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-/**
- * Encapsulates a strategy for backing off from an operation that repeatedly fails.
- */
-public interface BackoffStrategy {
-
- /**
- * Calculates the amount of time to backoff from an operation.
- *
- * @param lastBackoffMs the last used backoff in milliseconds where 0 signifies no backoff has
- * been performed yet
- * @return the amount of time in milliseconds to back off before retrying the operation
- */
- long calculateBackoffMs(long lastBackoffMs);
-
- /**
- * Returns whether to continue backing off.
- *
- * @param lastBackoffMs the last used backoff in milliseconds
- * @return whether to continue backing off
- */
- boolean shouldContinue(long lastBackoffMs);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BuildInfo.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/BuildInfo.java b/commons/src/main/java/com/twitter/common/util/BuildInfo.java
deleted file mode 100644
index 7fd1c4c..0000000
--- a/commons/src/main/java/com/twitter/common/util/BuildInfo.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import com.twitter.common.base.MorePreconditions;
-
-/**
- * Handles loading of a build properties file, and provides keys to look up known values in the
- * properties.
- */
-public class BuildInfo {
-
- private static final Logger LOG = Logger.getLogger(BuildInfo.class.getName());
-
- private static final String DEFAULT_BUILD_PROPERTIES_PATH = "build.properties";
-
- private final String resourcePath;
-
- private Properties properties = null;
-
- /**
- * Creates a build info container that will use the default properties file path.
- */
- public BuildInfo() {
- this(DEFAULT_BUILD_PROPERTIES_PATH);
- }
-
- /**
- * Creates a build info container, reading from the given path.
- *
- * @param resourcePath The resource path to read build properties from.
- */
- public BuildInfo(String resourcePath) {
- this.resourcePath = MorePreconditions.checkNotBlank(resourcePath);
- }
-
- @VisibleForTesting
- public BuildInfo(Properties properties) {
- this.resourcePath = null;
- this.properties = properties;
- }
-
- private void fetchProperties() {
- properties = new Properties();
- LOG.info("Fetching build properties from " + resourcePath);
- InputStream in = ClassLoader.getSystemResourceAsStream(resourcePath);
- if (in == null) {
- LOG.warning("Failed to fetch build properties from " + resourcePath);
- return;
- }
-
- try {
- properties.load(in);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to load properties file " + resourcePath, e);
- }
- }
-
- /**
- * Fetches the properties stored in the resource location.
- *
- * @return The loaded properties, or a default properties object if there was a problem loading
- * the specified properties resource.
- */
- public Properties getProperties() {
- if (properties == null) fetchProperties();
- return properties;
- }
-
- /**
- * Values of keys that are expected to exist in the loaded properties file.
- */
- public enum Key {
- PATH("build.path"),
- USER("build.user.name"),
- MACHINE("build.machine"),
- DATE("build.date"),
- TIME("build.time"),
- TIMESTAMP("build.timestamp"),
- GIT_TAG("build.git.tag"),
- GIT_REVISION("build.git.revision"),
- GIT_REVISION_NUMBER("build.git.revision.number"),
- GIT_BRANCHNAME("build.git.branchname");
-
- public final String value;
- private Key(String value) {
- this.value = value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/Clock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/Clock.java b/commons/src/main/java/com/twitter/common/util/Clock.java
deleted file mode 100644
index f23c349..0000000
--- a/commons/src/main/java/com/twitter/common/util/Clock.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import java.io.Serializable;
-
-/**
- * An abstraction of the system clock.
- *
- * @author John Sirois
- */
-public interface Clock {
-
- /**
- * A clock that returns the the actual time reported by the system.
- * This clock is guaranteed to be serializable.
- */
- Clock SYSTEM_CLOCK = new SerializableClock() {
- @Override public long nowMillis() {
- return System.currentTimeMillis();
- }
- @Override public long nowNanos() {
- return System.nanoTime();
- }
- @Override public void waitFor(long millis) throws InterruptedException {
- Thread.sleep(millis);
- }
- };
-
- /**
- * Returns the current time in milliseconds since the epoch.
- *
- * @return The current time in milliseconds since the epoch.
- * @see System#currentTimeMillis()
- */
- long nowMillis();
-
- /**
- * Returns the current time in nanoseconds. Should be used only for relative timing.
- * See {@code System.nanoTime()} for tips on using the value returned here.
- *
- * @return A measure of the current time in nanoseconds.
- * @see System#nanoTime()
- */
- long nowNanos();
-
- /**
- * Waits for the given amount of time to pass on this clock before returning.
- *
- * @param millis the amount of time to wait in milliseconds
- * @throws InterruptedException if this wait was interrupted
- */
- void waitFor(long millis) throws InterruptedException;
-}
-
-/**
- * A typedef to support anonymous {@link Clock} implementations that are also {@link Serializable}.
- */
-interface SerializableClock extends Clock, Serializable { }
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/CommandExecutor.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/CommandExecutor.java b/commons/src/main/java/com/twitter/common/util/CommandExecutor.java
deleted file mode 100644
index ad44524..0000000
--- a/commons/src/main/java/com/twitter/common/util/CommandExecutor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Asynchronous executor of enqueued tasks in a rate limited manner.
- *
- * @author Srinivasan Rajagopal
- */
-public interface CommandExecutor {
-
- /**
- * Enqueue a task to be executed with retry semantics defined.
- *
- * @param name Human readable name for this task.
- * @param task task to execute.
- * @param exceptionClass Concrete exception type.
- * @param maxTries num of tries in case of failure.
- * @param retryDelay interval between retries in case of failure.
- */
- <E extends Exception> void execute(
- String name,
- ExceptionalCommand<E> task,
- Class<E> exceptionClass,
- int maxTries,
- Amount<Long, Time> retryDelay);
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/DateUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/DateUtils.java b/commons/src/main/java/com/twitter/common/util/DateUtils.java
deleted file mode 100644
index 0f9f950..0000000
--- a/commons/src/main/java/com/twitter/common/util/DateUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utilities for working with java {@link Date}s.
- *
- * @author John Sirois
- */
-public final class DateUtils {
-
- public static Date now() {
- return new Date();
- }
-
- public static long toUnixTime(Date date) {
- return toUnixTime(date.getTime());
- }
-
- public static long nowUnixTime() {
- return toUnixTime(System.currentTimeMillis());
- }
-
- public static long toUnixTime(long millisSinceEpoch) {
- return TimeUnit.MILLISECONDS.toSeconds(millisSinceEpoch);
- }
-
- public static Date ago(int calendarField, int amount) {
- return ago(now(), calendarField, amount);
- }
-
- public static Date ago(Date referenceDate, int calendarField, int amount) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(referenceDate);
- calendar.add(calendarField, -1 * amount);
- return calendar.getTime();
- }
-
- private DateUtils() {
- // utility
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/FileUtils.java b/commons/src/main/java/com/twitter/common/util/FileUtils.java
deleted file mode 100644
index 0951662..0000000
--- a/commons/src/main/java/com/twitter/common/util/FileUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import java.io.File;
-
-/**
- * Utilities for working with Files
- *
- * @author Florian Leibert
- */
-public final class FileUtils {
-
- private FileUtils() {
- }
-
- /**
- * recursively deletes the path and all it's content and returns true if it succeeds
- * Note that the content could be partially deleted and the method return false
- *
- * @param path the path to delete
- * @return true if the path was deleted
- */
- public static boolean forceDeletePath(File path) {
- if (path == null) {
- return false;
- }
- if (path.exists() && path.isDirectory()) {
- File[] files = path.listFiles();
- for (File file : files) {
- if (file.isDirectory()) {
- forceDeletePath(file);
- } else {
- file.delete();
- }
- }
- }
- return path.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/LowResClock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/LowResClock.java b/commons/src/main/java/com/twitter/common/util/LowResClock.java
deleted file mode 100644
index ad26bee..0000000
--- a/commons/src/main/java/com/twitter/common/util/LowResClock.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.io.Closeable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Low resolution implementation of a {@link com.twitter.common.util.Clock},
- * optimized for fast reads at the expense of precision.
- * It works by caching the result of the system clock for a
- * {@code resolution} amount of time.
- */
-public class LowResClock implements Clock, Closeable {
- private static final ScheduledExecutorService GLOBAL_SCHEDULER =
- Executors.newScheduledThreadPool(1, new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "LowResClock");
- t.setDaemon(true);
- return t;
- }
- });
-
- private volatile long time;
- private final ScheduledFuture<?> updaterHandler;
- private final Clock underlying;
-
- @VisibleForTesting
- LowResClock(Amount<Long, Time> resolution, ScheduledExecutorService executor, Clock clock) {
- long sleepTimeMs = resolution.as(Time.MILLISECONDS);
- Preconditions.checkArgument(sleepTimeMs > 0);
- underlying = clock;
- Runnable ticker = new Runnable() {
- @Override public void run() {
- time = underlying.nowMillis();
- }
- };
-
- // Ensure the constructing thread sees a LowResClock with a valid (low-res) time by executing a
- // blocking call now.
- ticker.run();
-
- updaterHandler =
- executor.scheduleAtFixedRate(ticker, sleepTimeMs, sleepTimeMs, TimeUnit.MILLISECONDS);
- }
-
-
- /**
- * Construct a LowResClock which wraps the system clock.
- * This constructor will also schedule a periodic task responsible for
- * updating the time every {@code resolution}.
- */
- public LowResClock(Amount<Long, Time> resolution) {
- this(resolution, GLOBAL_SCHEDULER, Clock.SYSTEM_CLOCK);
- }
-
- /**
- * Terminate the underlying updater task.
- * Any subsequent usage of the clock will throw an {@link IllegalStateException}.
- */
- public void close() {
- updaterHandler.cancel(true);
- }
-
- @Override
- public long nowMillis() {
- checkNotClosed();
- return time;
- }
-
- @Override
- public long nowNanos() {
- return nowMillis() * 1000 * 1000;
- }
-
- @Override
- public void waitFor(long millis) throws InterruptedException {
- checkNotClosed();
- underlying.waitFor(millis);
- }
-
- private void checkNotClosed() {
- if (updaterHandler.isCancelled()) {
- throw new IllegalStateException("LowResClock invoked after being closed!");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/ParsingUtil.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/util/ParsingUtil.java b/commons/src/main/java/com/twitter/common/util/ParsingUtil.java
deleted file mode 100644
index d84975a..0000000
--- a/commons/src/main/java/com/twitter/common/util/ParsingUtil.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.util;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.common.collections.Pair;
-
-/**
- * Common methods for parsing configs.
- *
- * @author John Sirois
- */
-public class ParsingUtil {
- /**
- * Parses a string as a range between one integer and another. The integers must be separated by
- * a hypen character (space padding is acceptable). Additionally, the first integer
- * (left-hand side) must be less than or equal to the second (right-hand side).
- *
- * @param rangeString The string to parse as an integer range.
- * @return A pair of the parsed integers.
- */
- public static Pair<Integer, Integer> parseRange(String rangeString) {
- if (rangeString == null) return null;
-
- String[] startEnd = rangeString.split("-");
- Preconditions.checkState(
- startEnd.length == 2, "Shard range format: start-end (e.g. 1-4)");
- int start;
- int end;
- try {
- start = Integer.parseInt(startEnd[0].trim());
- end = Integer.parseInt(startEnd[1].trim());
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Failed to parse shard range.", e);
- }
-
- Preconditions.checkState(
- start <= end, "The left-hand side of a shard range must be <= the right-hand side.");
- return Pair.of(start, end);
- }
-}