You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:37 UTC
[23/37] aurora git commit: Import of Twitter Commons.
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
new file mode 100644
index 0000000..8767433
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
@@ -0,0 +1,124 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.easymock;
+
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.WildcardType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.reflect.TypeToken;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+
+import static org.easymock.EasyMock.createControl;
+
+/**
+ * A baseclass for tests that use EasyMock. A new {@link IMocksControl control} is set up before
+ * each test and the mocks created and replayed with it are verified during tear down.
+ *
+ * @author John Sirois
+ */
+public abstract class EasyMockTest extends TearDownTestCase {
+ protected IMocksControl control;
+
+ /**
+ * Creates an EasyMock {@link #control} for tests to use that will be automatically
+ * {@link IMocksControl#verify() verified} on tear down.
+ */
+ @Before
+ public final void setupEasyMock() {
+ control = createControl();
+ addTearDown(new TearDown() {
+ @Override public void tearDown() {
+ control.verify();
+ }
+ });
+ }
+
+ /**
+ * Creates an EasyMock mock with this test's control. Will be
+ * {@link IMocksControl#verify() verified} in a tear down.
+ */
+ public <T> T createMock(Class<T> type) {
+ Preconditions.checkNotNull(type);
+ return control.createMock(type);
+ }
+
+ /**
+ * A class meant to be sub-classed in order to capture a generic type literal value. To capture
+ * the type of a {@code List<String>} you would use: {@code new Clazz<List<String>>() {}}
+ */
+ public abstract static class Clazz<T> extends TypeToken {
+ Class<T> rawType() {
+ @SuppressWarnings("unchecked")
+ Class<T> rawType = (Class<T>) findRawType();
+ return rawType;
+ }
+
+ private Class<?> findRawType() {
+ if (getType() instanceof Class<?>) { // Plain old
+ return (Class<?>) getType();
+
+ } else if (getType() instanceof ParameterizedType) { // Nested type parameter
+ ParameterizedType parametrizedType = (ParameterizedType) getType();
+ Type rawType = parametrizedType.getRawType();
+ return (Class<?>) rawType;
+ } else if (getType() instanceof GenericArrayType) {
+ throw new IllegalStateException("cannot mock arrays, rejecting type: " + getType());
+ } else if (getType() instanceof WildcardType) {
+ throw new IllegalStateException(
+ "wildcarded instantiations are not allowed in java, rejecting type: " + getType());
+ } else {
+ throw new IllegalArgumentException("Could not decode raw type for: " + getType());
+ }
+ }
+
+ public T createMock() {
+ return EasyMock.createMock(rawType());
+ }
+
+ public T createMock(IMocksControl control) {
+ return control.createMock(rawType());
+ }
+ }
+
+ /**
+ * Creates an EasyMock mock with this test's control. Will be
+ * {@link IMocksControl#verify() verified} in a tear down.
+ *
+ * Allows for mocking of parameterized types without all the unchecked conversion warnings in a
+ * safe way.
+ */
+ public <T> T createMock(Clazz<T> type) {
+ Preconditions.checkNotNull(type);
+ return type.createMock(control);
+ }
+
+ /**
+ * A type-inferring convenience method for creating new captures.
+ */
+ public static <T> Capture<T> createCapture() {
+ return new Capture<T>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
new file mode 100644
index 0000000..5197e91
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
@@ -0,0 +1,77 @@
+package com.twitter.common.testing.easymock;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multiset;
+
+import org.easymock.IArgumentMatcher;
+
+import static org.easymock.EasyMock.reportMatcher;
+
+/**
+ * This EasyMock argument matcher tests Iterables for equality irrespective of order.
+ *
+ * @param <T> type argument for the Iterables being matched.
+ */
+public class IterableEquals<T> implements IArgumentMatcher {
+ private final Multiset<T> elements = HashMultiset.create();
+
+ /**
+ * Constructs an IterableEquals object that tests for equality against the specified expected
+ * Iterable.
+ *
+ * @param expected an Iterable containing the elements that are expected, in any order.
+ */
+ public IterableEquals(Iterable<T> expected) {
+ Iterables.addAll(elements, expected);
+ }
+
+ @Override
+ public boolean matches(Object observed) {
+ if (observed instanceof Iterable<?>) {
+ Multiset<Object> observedElements = HashMultiset.create((Iterable<?>) observed);
+ return elements.equals(observedElements);
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {
+ buffer.append("eqIterable(").append(elements).append(")");
+ }
+
+ /**
+ * When used in EasyMock expectations, this matches an Iterable having the same elements in any
+ * order.
+ *
+ * @return null, to avoid a compile time error.
+ */
+ public static <T> Iterable<T> eqIterable(Iterable<T> in) {
+ reportMatcher(new IterableEquals(in));
+ return null;
+ }
+
+ /**
+ * When used in EasyMock expectations, this matches a List having the same elements in any order.
+ *
+ * @return null, to avoid a compile time error.
+ */
+ public static <T> List<T> eqList(Iterable<T> in) {
+ reportMatcher(new IterableEquals(in));
+ return null;
+ }
+
+ /**
+ * When used in EasyMock expectations, this matches a Collection having the same elements in any
+ * order.
+ *
+ * @return null, to avoid a compile time error.
+ */
+ public static <T> Collection<T> eqCollection(Iterable<T> in) {
+ reportMatcher(new IterableEquals(in));
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
new file mode 100644
index 0000000..5658ff6
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
@@ -0,0 +1,161 @@
+// =================================================================================================
+// Copyright 2015 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.junit.rules;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
+import org.junit.rules.MethodRule;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+
+/**
+ * A test method annotation useful for smoking out flaky behavior in tests.
+ *
+ * @see Retry.Rule RetryRule needed to enable this annotation in a test class.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Retry {
+
+ /**
+ * The number of times to retry the test.
+ *
+ * When a {@link Retry.Rule} is installed and a test method is annotated for {@literal @Retry},
+ * it will be retried 0 to N times. If times is negative, it is treated as 0 and no retries are
+ * performed. If times is >= 1 then a successful execution of the annotated test method is
+ * retried until the 1st error, failure or otherwise up to {@code times} times.
+ */
+ int times() default 1;
+
+ /**
+ * Enables {@link Retry @Retry}able tests.
+ */
+ class Rule implements MethodRule {
+ private interface ThrowableFactory {
+ Throwable create(String message, Throwable cause);
+ }
+
+ private static Throwable annotate(
+ int tryNumber,
+ final int maxRetries,
+ Throwable cause,
+ String prefix,
+ ThrowableFactory throwableFactory) {
+
+ Throwable annotated =
+ throwableFactory.create(
+ String.format("%s on try %d of %d: %s", prefix, tryNumber, maxRetries + 1,
+ Objects.firstNonNull(cause.getMessage(), "")), cause);
+ annotated.setStackTrace(cause.getStackTrace());
+ return annotated;
+ }
+
+ static class RetriedAssertionError extends AssertionError {
+ private final int tryNumber;
+ private final int maxRetries;
+
+ RetriedAssertionError(int tryNumber, int maxRetries, String message, Throwable cause) {
+ // We do a manual initCause here to be compatible with the Java 1.6 AssertionError
+ // constructors.
+ super(message);
+ initCause(cause);
+
+ this.tryNumber = tryNumber;
+ this.maxRetries = maxRetries;
+ }
+
+ @VisibleForTesting
+ int getTryNumber() {
+ return tryNumber;
+ }
+
+ @VisibleForTesting
+ int getMaxRetries() {
+ return maxRetries;
+ }
+ }
+
+ private static Throwable annotate(final int tryNumber, final int maxRetries, AssertionError e) {
+ return annotate(tryNumber, maxRetries, e, "Failure", new ThrowableFactory() {
+ @Override public Throwable create(String message, Throwable cause) {
+ return new RetriedAssertionError(tryNumber, maxRetries, message, cause);
+ }
+ });
+ }
+
+ static class RetriedException extends Exception {
+ private final int tryNumber;
+ private final int maxRetries;
+
+ RetriedException(int tryNumber, int maxRetries, String message, Throwable cause) {
+ super(message, cause);
+ this.tryNumber = tryNumber;
+ this.maxRetries = maxRetries;
+ }
+
+ @VisibleForTesting
+ int getTryNumber() {
+ return tryNumber;
+ }
+
+ @VisibleForTesting
+ int getMaxRetries() {
+ return maxRetries;
+ }
+ }
+
+ private static Throwable annotate(final int tryNumber, final int maxRetries, Exception e) {
+ return annotate(tryNumber, maxRetries, e, "Error", new ThrowableFactory() {
+ @Override public Throwable create(String message, Throwable cause) {
+ return new RetriedException(tryNumber, maxRetries, message, cause);
+ }
+ });
+ }
+
+ @Override
+ public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) {
+ Retry retry = method.getAnnotation(Retry.class);
+ if (retry == null || retry.times() <= 0) {
+ return statement;
+ } else {
+ final int times = retry.times();
+ return new Statement() {
+ @Override public void evaluate() throws Throwable {
+ for (int i = 0; i <= times; i++) {
+ try {
+ statement.evaluate();
+ } catch (AssertionError e) {
+ throw annotate(i + 1, times, e);
+ // We purposefully catch any non-assertion exceptions in order to tag the try count
+ // for erroring (as opposed to failing) tests.
+ // SUPPRESS CHECKSTYLE RegexpSinglelineJava
+ } catch (Exception e) {
+ throw annotate(i + 1, times, e);
+ }
+ }
+ }
+ };
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
new file mode 100644
index 0000000..a56bb2b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
@@ -0,0 +1,34 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.mockito;
+
+import org.junit.Before;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * A base class for tests that use Mockito. Before each test, it initializes all the mocks
+ * declared in the class.
+ */
+public abstract class MockitoTest {
+ /**
+ * Initializes all fields annotated with {@link org.mockito.Mock}.
+ */
+ @Before
+ public final void initMockito() {
+ MockitoAnnotations.initMocks(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Config.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/Config.java b/commons/src/main/java/com/twitter/common/thrift/Config.java
new file mode 100644
index 0000000..977489f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/Config.java
@@ -0,0 +1,305 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+
+/**
+ * Represents the configuration for a thrift call. Use {@link #builder()} to create a new one or
+ * or {@link #builder(Config)} to create a new config based on another config.
+ *
+ * <p>If a deadline is specified, it acts as a global timeout for each thrift call made.
+ * Obtaining connections, performing the remote call and executing retries are all expected to
+ * complete within this deadline. When the specified deadline is not met, an
+ * {@link TTimeoutException} will be thrown.
+ *
+ * <p>If max retries is specified as zero (never retry), then the list of retryable exceptions are
+ * ignored. It is only when max retries is greater than zero that list of retryable exceptions is
+ * used to determine if a particular failed call should be retried.
+ *
+ * @author John Sirois
+ */
+public class Config {
+
+ /**
+ * Created a builder for a new {@link Config}. Default values are as follows:
+ * <ul>
+ * <li>{@link #getRequestTimeout()} 0
+ * <li>{@link #getMaxRetries()} 0
+ * <li>{@link #getRetryableExceptions()} []
+ * <li>{@link #isDebug()} ()} false
+ * </ul>
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ *
+ * @param config the builder configuration to use
+ */
+ public static Builder builder(Config config) {
+ Preconditions.checkNotNull(config);
+ return new Builder(config);
+ }
+
+ private static final Amount<Long,Time> DEADLINE_BLOCKING = Amount.of(0L, Time.MILLISECONDS);
+
+ @VisibleForTesting
+ static final Amount<Long,Time> DEFAULT_CONNECT_TIMEOUT = Amount.of(5L, Time.SECONDS);
+
+ private Amount<Long, Time> requestTimeout = DEADLINE_BLOCKING;
+ private Amount<Long, Time> connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private int maxRetries;
+ private ImmutableSet<Class<? extends Exception>> retryableExceptions = ImmutableSet.of();
+ private boolean debug = false;
+ private boolean enableStats = true;
+ private StatsProvider statsProvider = Stats.STATS_PROVIDER;
+
+ private Config() {
+ // defaults
+ }
+
+ private Config(Config copyFrom) {
+ requestTimeout = copyFrom.requestTimeout;
+ maxRetries = copyFrom.maxRetries;
+ retryableExceptions = copyFrom.retryableExceptions;
+ debug = copyFrom.debug;
+ statsProvider = copyFrom.statsProvider;
+ }
+
+ /**
+ * Returns the maximum time to wait for any thrift call to complete. A deadline of 0 means to
+ * wait forever
+ */
+ public Amount<Long, Time> getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * Returns the maximum time to wait for a connection to be established. A deadline of 0 means to
+ * wait forever
+ */
+ public Amount<Long, Time> getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ /**
+ * Returns the maximum number of retries to perform for each thrift call. A value of 0 means to
+ * never retry and in this case {@link #getRetryableExceptions()} will be an empty set.
+ */
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ /**
+ * Returns the set of exceptions to retry calls for. The returned set will only be empty if
+ * {@link #getMaxRetries()} is 0.
+ */
+ public ImmutableSet<Class<? extends Exception>> getRetryableExceptions() {
+ return retryableExceptions;
+ }
+
+ /**
+ * Returns {@code true} if the client should log extra debugging information. Currently this
+ * includes method call arguments when RPCs fail with exceptions.
+ */
+ public boolean isDebug() {
+ return debug;
+ }
+
+ /**
+ * Returns {@code true} if the client should track request statistics.
+ */
+ public boolean enableStats() {
+ return enableStats;
+ }
+
+ /**
+ * Returns the stats provider to use to record Thrift client stats.
+ */
+ public StatsProvider getStatsProvider() {
+ return statsProvider;
+ }
+
+ // This was made public because it seems to be causing problems for scala users when it is not
+ // public.
+ public static abstract class AbstractBuilder<T extends AbstractBuilder> {
+ private final Config config;
+
+ AbstractBuilder() {
+ this.config = new Config();
+ }
+
+ AbstractBuilder(Config template) {
+ Preconditions.checkNotNull(template);
+ this.config = new Config(template);
+ }
+
+ protected abstract T getThis();
+
+ // TODO(John Sirois): extra validation or design ... can currently do strange things like:
+ // builder.blocking().withDeadline(1, TimeUnit.MILLISECONDS)
+ // builder.noRetries().retryOn(TException.class)
+
+ /**
+ * Specifies that all calls be blocking calls with no inherent deadline. It may be the
+ * case that underlying transports will eventually deadline, but {@link Thrift} will not
+ * enforce a deadline.
+ */
+ public final T blocking() {
+ config.requestTimeout = DEADLINE_BLOCKING;
+ return getThis();
+ }
+
+ /**
+ * Specifies that all calls be subject to a global timeout. This deadline includes all call
+ * activities, including obtaining a free connection and any automatic retries.
+ */
+ public final T withRequestTimeout(Amount<Long, Time> timeout) {
+ Preconditions.checkNotNull(timeout);
+ Preconditions.checkArgument(timeout.getValue() >= 0,
+ "A negative deadline is invalid: %s", timeout);
+ config.requestTimeout = timeout;
+ return getThis();
+ }
+
+ /**
+ * Assigns the timeout for all connections established with the blocking client.
+ * On an asynchronous client this timeout is only used for the connection pool lock
+ * acquisition on initial calls (not retries, @see withRetries). The actual network
+ * connection timeout for the asynchronous client is governed by socketTimeout.
+ *
+ * @param timeout Connection timeout.
+ * @return A reference to the builder.
+ */
+ public final T withConnectTimeout(Amount<Long, Time> timeout) {
+ Preconditions.checkNotNull(timeout);
+ Preconditions.checkArgument(timeout.getValue() >= 0,
+ "A negative deadline is invalid: %s", timeout);
+ config.connectTimeout = timeout;
+ return getThis();
+ }
+
+ /**
+ * Specifies that no calls be automatically retried.
+ */
+ public final T noRetries() {
+ config.maxRetries = 0;
+ config.retryableExceptions = ImmutableSet.of();
+ return getThis();
+ }
+
+ /**
+ * Specifies that failing calls meeting {@link #retryOn retry} criteria be retried up to a
+ * maximum of {@code retries} times before failing. On an asynchronous client, these retries
+ * will be forced to be non-blocking, failing fast if they cannot immediately acquire the
+ * connection pool locks, so they only provide a best-effort retry strategy there.
+ */
+ public final T withRetries(int retries) {
+ Preconditions.checkArgument(retries >= 0, "A negative retry count is invalid: %d", retries);
+ config.maxRetries = retries;
+ return getThis();
+ }
+
+ /**
+ * Specifies the set of exception classes that are to be considered retryable (if retries are
+ * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable
+ * if they are an instance of any one of the specified exception classes. The set of exception
+ * classes must contain at least exception class. To specify no retries either use
+ * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+ */
+ public final T retryOn(Iterable<? extends Class<? extends Exception>> retryableExceptions) {
+ Preconditions.checkNotNull(retryableExceptions);
+ ImmutableSet<Class<? extends Exception>> classes =
+ ImmutableSet.copyOf(Iterables.filter(retryableExceptions, Predicates.notNull()));
+ Preconditions.checkArgument(!classes.isEmpty(),
+ "Must provide at least one retryable exception class");
+ config.retryableExceptions = classes;
+ return getThis();
+ }
+
+ /**
+ * Specifies the set of exception classes that are to be considered retryable (if retries are
+ * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable
+ * if they are an instance of any one of the specified exception classes. The set of exception
+ * classes must contain at least exception class. To specify no retries either use
+ * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+ */
+ public final T retryOn(Class<? extends Exception> exception) {
+ Preconditions.checkNotNull(exception);
+ config.retryableExceptions =
+ ImmutableSet.<Class<? extends Exception>>builder().add(exception).build();
+ return getThis();
+ }
+
+ /**
+ * When {@code debug == true}, specifies that extra debugging information should be logged.
+ */
+ public final T withDebug(boolean debug) {
+ config.debug = debug;
+ return getThis();
+ }
+
+ /**
+ * Disables stats collection on the client (enabled by default).
+ */
+ public T disableStats() {
+ config.enableStats = false;
+ return getThis();
+ }
+
+ /**
+ * Registers a custom stats provider to use to track various client stats.
+ */
+ public T withStatsProvider(StatsProvider statsProvider) {
+ config.statsProvider = Preconditions.checkNotNull(statsProvider);
+ return getThis();
+ }
+
+ protected final Config getConfig() {
+ return config;
+ }
+ }
+
+ public static final class Builder extends AbstractBuilder<Builder> {
+ private Builder() {
+ super();
+ }
+
+ private Builder(Config template) {
+ super(template);
+ }
+
+ @Override
+ protected Builder getThis() {
+ return this;
+ }
+
+ public Config create() {
+ return getConfig();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
new file mode 100644
index 0000000..fb9194d
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
@@ -0,0 +1,42 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when there are no available instances of a thrift backend
+ * service to serve requests.
+ */
+public class TResourceExhaustedException extends TException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TResourceExhaustedException(String message) {
+ super(message);
+ }
+
+ public TResourceExhaustedException(Throwable cause) {
+ super(cause);
+ }
+
+ public TResourceExhaustedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
new file mode 100644
index 0000000..50020bd
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
@@ -0,0 +1,41 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when accessing a thrift service resource times out.
+ */
+public class TTimeoutException extends TException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TTimeoutException(String message) {
+ super(message);
+ }
+
+ public TTimeoutException(Throwable cause) {
+ super(cause);
+ }
+
+ public TTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
new file mode 100644
index 0000000..329c03f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
@@ -0,0 +1,73 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ConnectionPool;
+import org.apache.thrift.transport.TTransport;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift
+ * transport.
+ *
+ * @author John Sirois
+ */
+public class TTransportConnection implements Connection<TTransport, InetSocketAddress> {
+
+ private final TTransport transport;
+ private final InetSocketAddress endpoint;
+
+ public TTransportConnection(TTransport transport, InetSocketAddress endpoint) {
+ this.transport = Preconditions.checkNotNull(transport);
+ this.endpoint = Preconditions.checkNotNull(endpoint);
+ }
+
+ /**
+ * Returns {@code true} if the underlying transport is still open. To invalidate a transport it
+ * should be closed.
+ *
+ * <p>TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an
+ * invalidation method - correct or accept
+ */
+ @Override
+ public boolean isValid() {
+ return transport.isOpen();
+ }
+
+ @Override
+ public TTransport get() {
+ return transport;
+ }
+
+ @Override
+ public void close() {
+ transport.close();
+ }
+
+ @Override
+ public InetSocketAddress getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return endpoint.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Thrift.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/Thrift.java b/commons/src/main/java/com/twitter/common/thrift/Thrift.java
new file mode 100644
index 0000000..aecf251
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/Thrift.java
@@ -0,0 +1,393 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.loadbalancing.RequestTracker;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.thrift.callers.Caller;
+import com.twitter.common.thrift.callers.DeadlineCaller;
+import com.twitter.common.thrift.callers.DebugCaller;
+import com.twitter.common.thrift.callers.RetryingCaller;
+import com.twitter.common.thrift.callers.StatTrackingCaller;
+import com.twitter.common.thrift.callers.ThriftCaller;
+
+/**
+ * A generic thrift client that handles reconnection in the case of protocol errors, automatic
+ * retries, call deadlines and call statistics tracking. This class aims for behavior compatible
+ * with the <a href="http://github.com/fauna/thrift_client">generic ruby thrift client</a>.
+ *
+ * <p>In order to enforce call deadlines for synchronous clients, this class uses an
+ * {@link java.util.concurrent.ExecutorService}. If a custom executor is supplied, it should throw
+ * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which
+ * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}.
+ *
+ * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies
+ * to use a single proxy and wrapped functions for decorators.
+ *
+ * @author John Sirois
+ */
+public class Thrift<T> {
+
+ /**
+ * The default thrift call configuration used if none is specified.
+ *
+ * Specifies the following settings:
+ * <ul>
+ * <li>global call timeout: 1 second
+ * <li>call retries: 0
+ * <li>retryable exceptions: TTransportException (network exceptions including socket timeouts)
+ * <li>wait for connections: true
+ * <li>debug: false
+ * </ul>
+ */
+ public static final Config DEFAULT_CONFIG = Config.builder()
+ .withRequestTimeout(Amount.of(1L, Time.SECONDS))
+ .noRetries()
+ .retryOn(TTransportException.class) // if maxRetries is set non-zero
+ .create();
+
+ /**
+ * The default thrift call configuration used for an async client if none is specified.
+ *
+ * Specifies the following settings:
+ * <ul>
+ * <li>global call timeout: none
+ * <li>call retries: 0
+ * <li>retryable exceptions: IOException, TTransportException
+ * (network exceptions but not timeouts)
+ * <li>wait for connections: true
+ * <li>debug: false
+ * </ul>
+ */
+ @SuppressWarnings("unchecked")
+ public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG)
+ .withRequestTimeout(Amount.of(0L, Time.SECONDS))
+ .noRetries()
+ .retryOn(ImmutableSet.<Class<? extends Exception>>builder()
+ .add(IOException.class)
+ .add(TTransportException.class).build()) // if maxRetries is set non-zero
+ .create();
+
+ private final Config defaultConfig;
+ private final ExecutorService executorService;
+ private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
+ private final RequestTracker<InetSocketAddress> requestTracker;
+ private final String serviceName;
+ private final Class<T> serviceInterface;
+ private final Function<TTransport, T> clientFactory;
+ private final boolean async;
+ private final boolean withSsl;
+
+ /**
+ * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool
+ * {@link ExecutorService}, and synchronous calls.
+ *
+ * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+ * boolean, boolean)
+ */
+ public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker,
+ String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory) {
+
+ this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface,
+ clientFactory, false, false);
+ }
+
+ /**
+ * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
+ * {@link ExecutorService}.
+ *
+ * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+ * boolean, boolean)
+ */
+ public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker,
+ String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+ boolean async) {
+
+ this(getConfig(async), connectionPool, requestTracker, serviceName,
+ serviceInterface, clientFactory, async, false);
+ }
+
+ /**
+ * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
+ * {@link ExecutorService}.
+ *
+ * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+ * boolean, boolean)
+ */
+ public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker,
+ String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+ boolean async, boolean ssl) {
+
+ this(getConfig(async), connectionPool, requestTracker, serviceName,
+ serviceInterface, clientFactory, async, ssl);
+ }
+
+ /**
+ * Constructs an instance with a cached thread pool {@link ExecutorService}.
+ *
+ * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+ * boolean, boolean)
+ */
+ public Thrift(Config config, ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker,
+ String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+ boolean async, boolean ssl) {
+
+ this(config,
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Thrift["+ serviceName +"][%d]")
+ .build()),
+ connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl);
+ }
+
+ /**
+ * Constructs an instance with the {@link #DEFAULT_CONFIG}.
+ *
+ * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+ * boolean, boolean)
+ */
+ public Thrift(ExecutorService executorService,
+ ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker,
+ String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+ boolean async, boolean ssl) {
+
+ this(getConfig(async), executorService, connectionPool, requestTracker, serviceName,
+ serviceInterface, clientFactory, async, ssl);
+ }
+
+ private static Config getConfig(boolean async) {
+ return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG;
+ }
+
+ /**
+ * Constructs a new Thrift factory for creating clients that make calls to a particular thrift
+ * service.
+ *
+ * <p>Note that the combination of {@code config} and {@code connectionPool} need to be chosen
+ * with care depending on usage of the generated thrift clients. In particular, if configured
+ * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of
+ * connections or else be actively building connections in the background.
+ *
+ * <p>TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle
+ * this case by pro-actively warming the pool.
+ *
+ * @param config the default configuration to use for all thrift calls; also the configuration all
+ * {@link ClientBuilder}s start with
+ * @param executorService for invoking calls with a specified deadline
+ * @param connectionPool the source for thrift connections
+ * @param serviceName a /vars friendly name identifying the service clients will connect to
+ * @param serviceInterface the thrift compiler generate interface class for the remote service
+ * (Iface)
+ * @param clientFactory a function that can generate a concrete thrift client for the given
+ * {@code serviceInterface}
+ * @param async enable asynchronous API
+ * @param ssl enable TLS handshaking for Thrift calls
+ */
+ public Thrift(Config config, ExecutorService executorService,
+ ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+ RequestTracker<InetSocketAddress> requestTracker, String serviceName,
+ Class<T> serviceInterface, Function<TTransport, T> clientFactory, boolean async, boolean ssl) {
+
+ defaultConfig = Preconditions.checkNotNull(config);
+ this.executorService = Preconditions.checkNotNull(executorService);
+ this.connectionPool = Preconditions.checkNotNull(connectionPool);
+ this.requestTracker = Preconditions.checkNotNull(requestTracker);
+ this.serviceName = MorePreconditions.checkNotBlank(serviceName);
+ this.serviceInterface = checkServiceInterface(serviceInterface);
+ this.clientFactory = Preconditions.checkNotNull(clientFactory);
+ this.async = async;
+ this.withSsl = ssl;
+ }
+
+ static <I> Class<I> checkServiceInterface(Class<I> serviceInterface) {
+ Preconditions.checkNotNull(serviceInterface);
+ Preconditions.checkArgument(serviceInterface.isInterface(),
+ "%s must be a thrift service interface", serviceInterface);
+ return serviceInterface;
+ }
+
+ /**
+ * Closes any open connections and prepares this thrift client for graceful shutdown. Any thrift
+ * client proxies returned from {@link #create()} will become invalid.
+ */
+ public void close() {
+ connectionPool.close();
+ executorService.shutdown();
+ }
+
+ /**
+ * A builder class that allows modifications of call behavior to be made for a given Thrift
+ * client. Note that in the case of conflicting configuration calls, the last call wins. So,
+ * for example, the following sequence would result in all calls being subject to a 5 second
+ * global deadline:
+ * <code>
+ * builder.blocking().withDeadline(5, TimeUnit.SECONDS).create()
+ * </code>
+ *
+ * @see Config
+ */
+ public final class ClientBuilder extends Config.AbstractBuilder<ClientBuilder> {
+ private ClientBuilder(Config template) {
+ super(template);
+ }
+
+ @Override
+ protected ClientBuilder getThis() {
+ return this;
+ }
+
+ /**
+ * Creates a new client using the built up configuration changes.
+ */
+ public T create() {
+ return createClient(getConfig());
+ }
+ }
+
+ /**
+ * Creates a new thrift client builder that inherits this Thrift instance's default configuration.
+ * This is useful for customizing a client for a particular thrift call that makes sense to treat
+ * differently from the rest of the calls to a given service.
+ */
+ public ClientBuilder builder() {
+ return builder(defaultConfig);
+ }
+
+ /**
+ * Creates a new thrift client builder that inherits the given configuration.
+ * This is useful for customizing a client for a particular thrift call that makes sense to treat
+ * differently from the rest of the calls to a given service.
+ */
+ public ClientBuilder builder(Config config) {
+ Preconditions.checkNotNull(config);
+ return new ClientBuilder(config);
+ }
+
+ /**
+ * Creates a new client using the default configuration specified for this Thrift instance.
+ */
+ public T create() {
+ return createClient(defaultConfig);
+ }
+
+ private T createClient(Config config) {
+ StatsProvider statsProvider = config.getStatsProvider();
+
+ // lease/call/[invalidate]/release
+ boolean debug = config.isDebug();
+
+ Caller decorated = new ThriftCaller<T>(connectionPool, requestTracker, clientFactory,
+ config.getConnectTimeout(), debug);
+
+ // [retry]
+ if (config.getMaxRetries() > 0) {
+ decorated = new RetryingCaller(decorated, async, statsProvider, serviceName,
+ config.getMaxRetries(), config.getRetryableExceptions(), debug);
+ }
+
+ // [deadline]
+ if (config.getRequestTimeout().getValue() > 0) {
+ Preconditions.checkArgument(!async,
+ "Request deadlines may not be used with an asynchronous client.");
+
+ decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout());
+ }
+
+ // [debug]
+ if (debug) {
+ decorated = new DebugCaller(decorated, async);
+ }
+
+ // stats
+ if (config.enableStats()) {
+ decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName);
+ }
+
+ final Caller caller = decorated;
+
+ final InvocationHandler invocationHandler = new InvocationHandler() {
+ @Override
+ public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+ AsyncMethodCallback callback = null;
+ if (args != null && async) {
+ List<Object> argsList = Lists.newArrayList(args);
+ callback = extractCallback(argsList);
+ args = argsList.toArray();
+ }
+
+ return caller.call(method, args, callback, null);
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
+ new Class<?>[] {serviceInterface}, invocationHandler);
+ return instance;
+ }
+
+ /**
+ * Verifies that the final argument in a list of objects is a fully-formed
+ * {@link AsyncMethodCallback} and extracts it, removing it from the argument list.
+ *
+ * @param args Argument list to remove the callback from.
+ * @return The callback extracted from {@code args}.
+ */
+ private static AsyncMethodCallback extractCallback(List<Object> args) {
+ // TODO(William Farner): Check all interface methods when building the Thrift client
+ // and verify that last arguments are all callbacks...this saves us from checking
+ // each time.
+
+ // Check that the last argument is a callback.
+ Preconditions.checkArgument(args.size() > 0);
+ Object lastArg = args.get(args.size() - 1);
+ Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback,
+ "Last argument of an async thrift call is expected to be of type AsyncMethodCallback.");
+
+ return (AsyncMethodCallback) args.remove(args.size() - 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
new file mode 100644
index 0000000..a1b79b0
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
@@ -0,0 +1,369 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ConnectionFactory;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * A connection factory for thrift transport connections to a given host. This connection factory
+ * is lazy and will only create a configured maximum number of active connections - where a
+ * {@link ConnectionFactory#create(com.twitter.common.quantity.Amount) created} connection that has
+ * not been {@link #destroy destroyed} is considered active.
+ *
+ * @author John Sirois
+ */
+public class ThriftConnectionFactory
+ implements ConnectionFactory<Connection<TTransport, InetSocketAddress>> {
+
+ public enum TransportType {
+ BLOCKING, FRAMED, NONBLOCKING;
+
+ /**
+ * Async clients implicitly use a framed transport, requiring the server they connect to to do
+ * the same. This prevents specifying a nonblocking client without a framed transport, since
+ * that is not compatible with thrift and would simply cause the client to blow up when making a
+ * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync().
+ */
+ public static TransportType get(boolean framedTransport, boolean nonblocking) {
+ if (nonblocking) {
+ Preconditions.checkArgument(framedTransport,
+ "nonblocking client requires a server running framed transport");
+ return NONBLOCKING;
+ }
+
+ return framedTransport ? FRAMED : BLOCKING;
+ }
+ }
+
+ private static InetSocketAddress asEndpoint(String host, int port) {
+ MorePreconditions.checkNotBlank(host);
+ Preconditions.checkArgument(port > 0);
+ return InetSocketAddress.createUnresolved(host, port);
+ }
+
+ private InetSocketAddress endpoint;
+ private final int maxConnections;
+ private final TransportType transportType;
+ private final Amount<Long, Time> socketTimeout;
+ private final Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback;
+ private boolean sslTransport = false;
+
+ private final Set<Connection<TTransport, InetSocketAddress>> activeConnections =
+ Sets.newSetFromMap(
+ Maps.<Connection<TTransport, InetSocketAddress>, Boolean>newIdentityHashMap());
+ private volatile int lastActiveConnectionsSize = 0;
+
+ private final Lock activeConnectionsWriteLock = new ReentrantLock(true);
+
+ /**
+ * Creates a thrift connection factory with a plain socket (non-framed transport).
+ * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with
+ * {@code framedTransport} set to {@code false}.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect on.
+ * @param maxConnections Maximum number of connections for this host:port.
+ */
+ public ThriftConnectionFactory(String host, int port, int maxConnections) {
+ this(host, port, maxConnections, TransportType.BLOCKING);
+ }
+
+ /**
+ * Creates a thrift connection factory.
+ * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect on.
+ * @param maxConnections Maximum number of connections for this host:port.
+ * @param framedTransport Whether to use framed or blocking transport.
+ */
+ public ThriftConnectionFactory(String host, int port, int maxConnections,
+ boolean framedTransport) {
+
+ this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false));
+ }
+
+ /**
+ * Creates a thrift connection factory.
+ * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ *
+ * @param endpoint Endpoint to connect to.
+ * @param maxConnections Maximum number of connections for this host:port.
+ * @param framedTransport Whether to use framed or blocking transport.
+ */
+ public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+ boolean framedTransport) {
+
+ this(endpoint, maxConnections, TransportType.get(framedTransport, false));
+ }
+
+ /**
+ * Creates a thrift connection factory.
+ * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * Timeouts are ignored when nonblocking transport is used.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect on.
+ * @param maxConnections Maximum number of connections for this host:port.
+ * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+ * (implicitly framed) transport.
+ */
+ public ThriftConnectionFactory(String host, int port, int maxConnections,
+ TransportType transportType) {
+ this(host, port, maxConnections, transportType, null);
+ }
+
+ /**
+ * Creates a thrift connection factory.
+ * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * Timeouts are ignored when nonblocking transport is used.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect on.
+ * @param maxConnections Maximum number of connections for this host:port.
+ * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+ * (implicitly framed) transport.
+ * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
+ * the blocking client.
+ */
+ public ThriftConnectionFactory(String host, int port, int maxConnections,
+ TransportType transportType, Amount<Long, Time> socketTimeout) {
+ this(asEndpoint(host, port), maxConnections, transportType, socketTimeout);
+ }
+
+ public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+ TransportType transportType) {
+ this(endpoint, maxConnections, transportType, null);
+ }
+
+ /**
+ * Creates a thrift connection factory.
+ * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+ * otherwise a raw {@link TSocket} will be used.
+ * Timeouts are ignored when nonblocking transport is used.
+ *
+ * @param endpoint Endpoint to connect to.
+ * @param maxConnections Maximum number of connections for this host:port.
+ * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+ * (implicitly framed) transport.
+ * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
+ * the blocking client.
+ */
+ public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+ TransportType transportType, Amount<Long, Time> socketTimeout) {
+ this(endpoint, maxConnections, transportType, socketTimeout,
+ Closures.<Connection<TTransport, InetSocketAddress>>noop(), false);
+ }
+
+ public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+ TransportType transportType, Amount<Long, Time> socketTimeout,
+ Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback,
+ boolean sslTransport) {
+ Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1");
+ if (socketTimeout != null) {
+ Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0);
+ }
+
+ this.endpoint = Preconditions.checkNotNull(endpoint);
+ this.maxConnections = maxConnections;
+ this.transportType = transportType;
+ this.socketTimeout = socketTimeout;
+ this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback);
+ this.sslTransport = sslTransport;
+ }
+
+ @Override
+ public boolean mightCreate() {
+ return lastActiveConnectionsSize < maxConnections;
+ }
+
+ /**
+ * FIXME: shouldn't this throw TimeoutException instead of returning null
+ * in the timeout cases as per the ConnectionFactory.create javadoc?
+ */
+ @Override
+ public Connection<TTransport, InetSocketAddress> create(Amount<Long, Time> timeout)
+ throws TTransportException, IOException {
+
+ Preconditions.checkNotNull(timeout);
+ if (timeout.getValue() == 0) {
+ return create();
+ }
+
+ try {
+ long timeRemainingNs = timeout.as(Time.NANOSECONDS);
+ long start = System.nanoTime();
+ if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) {
+ try {
+ if (!willCreateSafe()) {
+ return null;
+ }
+
+ timeRemainingNs -= (System.nanoTime() - start);
+
+ return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs));
+ } finally {
+ activeConnectionsWriteLock.unlock();
+ }
+ } else {
+ return null;
+ }
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ private Connection<TTransport, InetSocketAddress> create()
+ throws TTransportException, IOException {
+ activeConnectionsWriteLock.lock();
+ try {
+ if (!willCreateSafe()) {
+ return null;
+ }
+
+ return createConnection(0);
+ } finally {
+ activeConnectionsWriteLock.unlock();
+ }
+ }
+
+ private Connection<TTransport, InetSocketAddress> createConnection(int timeoutMillis)
+ throws TTransportException, IOException {
+ TTransport transport = createTransport(timeoutMillis);
+ if (transport == null) {
+ return null;
+ }
+
+ Connection<TTransport, InetSocketAddress> connection =
+ new TTransportConnection(transport, endpoint);
+ postCreateCallback.execute(connection);
+ activeConnections.add(connection);
+ lastActiveConnectionsSize = activeConnections.size();
+ return connection;
+ }
+
+ private boolean willCreateSafe() {
+ return activeConnections.size() < maxConnections;
+ }
+
+ @VisibleForTesting
+ TTransport createTransport(int timeoutMillis) throws TTransportException, IOException {
+ TSocket socket = null;
+ if (transportType != TransportType.NONBLOCKING) {
+ // can't do a nonblocking create on a blocking transport
+ if (timeoutMillis <= 0) {
+ return null;
+ }
+
+ if (sslTransport) {
+ SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
+ SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort());
+ ssl_socket.setSoTimeout(timeoutMillis);
+ return new TSocket(ssl_socket);
+ } else {
+ socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis);
+ }
+ }
+
+ try {
+ switch (transportType) {
+ case BLOCKING:
+ socket.open();
+ setSocketTimeout(socket);
+ return socket;
+ case FRAMED:
+ TFramedTransport transport = new TFramedTransport(socket);
+ transport.open();
+ setSocketTimeout(socket);
+ return transport;
+ case NONBLOCKING:
+ try {
+ return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort());
+ } catch (IOException e) {
+ throw new IOException("Failed to create non-blocking transport to " + endpoint, e);
+ }
+ }
+ } catch (TTransportException e) {
+ throw new TTransportException("Failed to create transport to " + endpoint, e);
+ }
+
+ throw new IllegalArgumentException("unknown transport type " + transportType);
+ }
+
+ private void setSocketTimeout(TSocket socket) {
+ if (socketTimeout != null) {
+ socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue());
+ }
+ }
+
+ @Override
+ public void destroy(Connection<TTransport, InetSocketAddress> connection) {
+ activeConnectionsWriteLock.lock();
+ try {
+ boolean wasActiveConnection = activeConnections.remove(connection);
+ Preconditions.checkArgument(wasActiveConnection,
+ "connection %s not created by this factory", connection);
+ lastActiveConnectionsSize = activeConnections.size();
+ } finally {
+ activeConnectionsWriteLock.unlock();
+ }
+
+ // We close the connection outside the critical section which means we may have more connections
+ // "active" (open) than maxConnections for a very short time
+ connection.close();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[%s]", getClass().getSimpleName(), endpoint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftException.java b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
new file mode 100644
index 0000000..b8e5949
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
@@ -0,0 +1,29 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+/**
+ * Exception class to wrap exceptions caught during thrift calls.
+ */
+public class ThriftException extends Exception {
+ public ThriftException(String message) {
+ super(message);
+ }
+ public ThriftException(String message, Throwable t) {
+ super(message, t);
+ }
+}