You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:18 UTC

[04/37] aurora git commit: Import of Twitter Commons.

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
new file mode 100644
index 0000000..45faf5d
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
@@ -0,0 +1,237 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+
+import com.twitter.common.objectsize.ObjectSizeCalculator;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.testing.RealHistogram;
+import com.twitter.common.util.testing.FakeClock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static com.twitter.common.stats.WindowedApproxHistogram.DEFAULT_MAX_MEMORY;
+
+/**
+ * Tests WindowedHistogram.
+ */
+public class WindowedHistogramTest {
+
+  @Test
+  public void testEmptyWinHistogram() {
+    WindowedApproxHistogram wh = new WindowedApproxHistogram();
+    assertEquals(0L, wh.getQuantile(0.0));
+  }
+
+  @Test
+  public void testWinHistogramWithEdgeCases() {
+    FakeClock clock = new FakeClock();
+    Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+    int slices = 10;
+    long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+    WindowedApproxHistogram h =
+        new WindowedApproxHistogram(window, slices, DEFAULT_MAX_MEMORY, clock);
+
+    h.add(Long.MIN_VALUE);
+    clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS));
+    assertEquals(Long.MIN_VALUE, h.getQuantile(0.0));
+    assertEquals(Long.MIN_VALUE, h.getQuantile(0.5));
+    assertEquals(Long.MIN_VALUE, h.getQuantile(1.0));
+
+    h.add(Long.MAX_VALUE);
+    clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS));
+    assertEquals(Long.MIN_VALUE, h.getQuantile(0.0));
+    assertEquals(Long.MIN_VALUE, h.getQuantile(0.25));
+    assertEquals(Long.MAX_VALUE, h.getQuantile(0.75));
+    assertEquals(Long.MAX_VALUE, h.getQuantile(1.0));
+  }
+
+  @Test
+  public void testClearedWinHistogram() {
+    FakeClock clock = new FakeClock();
+    Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+    int slices = 10;
+    Amount<Long, Time> sliceDuration = Amount.of(
+        window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS);
+    WindowedHistogram<?> h = createFullHistogram(window, slices, clock);
+    long p0 = h.getQuantile(0.1);
+    long p50 = h.getQuantile(0.5);
+    long p90 = h.getQuantile(0.9);
+    assertFalse(0 == p0);
+    assertFalse(0 == p50);
+    assertFalse(0 == p90);
+
+    h.clear();
+
+    assertEquals(0, h.getQuantile(0.1));
+    assertEquals(0, h.getQuantile(0.5));
+    assertEquals(0, h.getQuantile(0.9));
+
+    // reload the histogram with the exact same values than before
+    fillHistogram(h, sliceDuration, slices, clock);
+
+    assertEquals(p0, h.getQuantile(0.1));
+    assertEquals(p50, h.getQuantile(0.5));
+    assertEquals(p90, h.getQuantile(0.9));
+  }
+
+  @Test
+  public void testSimpleWinHistogram() {
+    FakeClock clock = new FakeClock();
+    Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+    int slices = 10;
+    WindowedHistogram<?> wh = createFullHistogram(window, slices, clock);
+
+    // check that the global distribution is the aggregation of all underlying histograms
+    for (int i = 1; i <= slices; i++) {
+      double q = (double) i / slices;
+      assertEquals(i, wh.getQuantile(q), 1.0);
+    }
+
+    // advance in time an forget about old values
+    long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+    clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+    for (int j = 0; j < 1000; j++) {
+      wh.add(11);
+    }
+    assertEquals(2, wh.getQuantile(0.05), 1.0);
+    assertEquals(11, wh.getQuantile(0.99), 1.0);
+  }
+
+  @Test
+  public void testWinHistogramWithGap() {
+    FakeClock clock = new FakeClock();
+    Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+    int slices = 10;
+    WindowedHistogram<?> wh = createFullHistogram(window, slices, clock);
+    // wh is a WindowedHistogram of 10 slices + the empty current with values from 1 to 10
+    // [1][2][3][4][5][6][7][8][9][10][.]
+    //                                 ^
+
+    for (int j = 0; j < 1000; j++) {
+      wh.add(100);
+    }
+    // [1][2][3][4][5][6][7][8][9][10][100]
+    //                                  ^
+    // quantiles are computed based on [1] -> [10]
+
+    clock.advance(Amount.of((slices - 1) * 100L / slices, Time.MILLISECONDS));
+    for (int j = 0; j < 1000; j++) {
+      wh.add(200);
+    }
+    // [1][2][3][4][5][6][7][8][200][10][100]
+    //                           ^
+    // quantiles are computed based on [10][100][1][2][3][4][5][6][7][8]
+    // and removing old ones           [10][100][.][.][.][.][.][.][.][.]
+    // all the histograms between 100 and 200 are old and shouldn't matter in the computation of
+    // quantiles.
+    assertEquals(10L, wh.getQuantile(0.25), 1.0);
+    assertEquals(100L, wh.getQuantile(0.75), 1.0);
+
+    clock.advance(Amount.of(100L / slices, Time.MILLISECONDS));
+    // [1][2][3][4][5][6][7][8][200][10][100]
+    //                               ^
+    // quantiles are computed based on [100][1][2][3][4][5][6][7][8][200]
+    // and removing old ones           [100][.][.][.][.][.][.][.][.][200]
+
+    assertEquals(100L, wh.getQuantile(0.25), 1.0);
+    assertEquals(200L, wh.getQuantile(0.75), 1.0);
+
+    // advance a lot in time, everything should be "forgotten"
+    clock.advance(Amount.of(500L, Time.MILLISECONDS));
+    assertEquals(0L, wh.getQuantile(0.5), 1.0);
+  }
+
+  @Test
+  public void testWinHistogramMemory() {
+    ImmutableList.Builder<Amount<Long, Data>> builder = ImmutableList.builder();
+    builder.add(Amount.of(8L, Data.KB));
+    builder.add(Amount.of(12L, Data.KB));
+    builder.add(Amount.of(16L, Data.KB));
+    builder.add(Amount.of(20L, Data.KB));
+    builder.add(Amount.of(24L, Data.KB));
+    builder.add(Amount.of(32L, Data.KB));
+    builder.add(Amount.of(64L, Data.KB));
+    builder.add(Amount.of(256L, Data.KB));
+    builder.add(Amount.of(1L, Data.MB));
+    builder.add(Amount.of(16L, Data.MB));
+    builder.add(Amount.of(32L, Data.MB));
+    List<Amount<Long, Data>> sizes = builder.build();
+
+    // large estimation of the memory used outside of buffers
+    long fixSize = Amount.of(4, Data.KB).as(Data.BYTES);
+
+    for (Amount<Long, Data> maxSize: sizes) {
+      WindowedApproxHistogram hist = new WindowedApproxHistogram(
+          Amount.of(60L, Time.SECONDS), 6, maxSize);
+      hist.add(1L);
+      hist.getQuantile(0.5);
+      long size = ObjectSizeCalculator.getObjectSize(hist);
+      // reverting CI JVM seems to have different memory consumption than mine
+      //assertTrue(size < fixSize + maxSize.as(Data.BYTES));
+    }
+  }
+
+  @Test
+  public void testWinHistogramAccuracy() {
+    FakeClock ticker = new FakeClock();
+    Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+    int slices = 10;
+    Amount<Long, Time> sliceDuration = Amount.of(
+        window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS);
+    WindowedHistogram<?> wh = createFullHistogram(window, slices, ticker);
+    RealHistogram rh = fillHistogram(new RealHistogram(), sliceDuration, slices, new FakeClock());
+
+    assertEquals(wh.getQuantile(0.5), rh.getQuantile(0.5));
+    assertEquals(wh.getQuantile(0.75), rh.getQuantile(0.75));
+    assertEquals(wh.getQuantile(0.9), rh.getQuantile(0.9));
+    assertEquals(wh.getQuantile(0.99), rh.getQuantile(0.99));
+  }
+
+  /**
+   * @return a WindowedHistogram with different value in each underlying Histogram
+   */
+  private WindowedHistogram<?> createFullHistogram(
+      Amount<Long, Time> duration, int slices, FakeClock clock) {
+    long sliceDuration = duration.as(Time.NANOSECONDS) / slices;
+    WindowedApproxHistogram wh = new WindowedApproxHistogram(duration, slices,
+        DEFAULT_MAX_MEMORY, clock);
+    clock.advance(Amount.of(1L, Time.NANOSECONDS));
+
+    return fillHistogram(wh, Amount.of(sliceDuration, Time.NANOSECONDS), slices, clock);
+  }
+
+  private <H extends Histogram> H fillHistogram(H h,
+      Amount<Long, Time> sliceDuration, int slices, FakeClock clock) {
+    for (int i = 1; i <= slices; i++) {
+      for (int j = 0; j < 1000; j++) {
+        h.add(i);
+      }
+      clock.advance(sliceDuration);
+    }
+    return h;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
new file mode 100644
index 0000000..ed821ea
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
@@ -0,0 +1,189 @@
+package com.twitter.common.stats;
+
+import org.junit.Test;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.WindowedStatistics;
+import com.twitter.common.util.testing.FakeClock;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStatsTest {
+  private Amount<Long, Time> window = Amount.of(1L, Time.MINUTES);
+  private int slices = 3;
+  private long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+
+  @Test
+  public void testEmptyStats() {
+    FakeClock clock = new FakeClock();
+    WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+    assertEmpty(ws);
+  }
+
+  @Test
+  public void testStatsCorrectness() {
+    FakeClock clock = new FakeClock();
+    Statistics reference = new Statistics();
+    WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+    for (int i=0; i<1000; i++) {
+      reference.accumulate(i);
+      ws.accumulate(i);
+    }
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+
+    assertEquals(reference.max(), ws.max());
+    assertEquals(reference.min(), ws.min());
+    assertEquals(reference.populationSize(), ws.populationSize());
+    assertEquals(reference.sum(), ws.sum());
+    assertEquals(reference.range(), ws.range());
+    assertEquals(reference.mean(), ws.mean(), 0.01);
+    assertEquals(reference.variance(), ws.variance(), 0.01);
+    assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+
+    for (int i=0; i<1000; i++) {
+      long x = i + 500;
+      reference.accumulate(x);
+      ws.accumulate(x);
+    }
+    clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+
+    assertEquals(reference.max(), ws.max());
+    assertEquals(reference.min(), ws.min());
+    assertEquals(reference.populationSize(), ws.populationSize());
+    assertEquals(reference.sum(), ws.sum());
+    assertEquals(reference.range(), ws.range());
+    assertEquals(reference.mean(), ws.mean(), 0.01);
+    assertEquals(reference.variance(), ws.variance(), 0.01);
+    assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+
+    for (int i=0; i<1000; i++) {
+      long x = i * i;
+      reference.accumulate(x);
+      ws.accumulate(x);
+    }
+    clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+
+    assertEquals(reference.max(), ws.max());
+    assertEquals(reference.min(), ws.min());
+    assertEquals(reference.populationSize(), ws.populationSize());
+    assertEquals(reference.sum(), ws.sum());
+    assertEquals(reference.range(), ws.range());
+    assertEquals(reference.mean(), ws.mean(), 0.01);
+    assertEquals(reference.variance(), ws.variance(), 0.01);
+    assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+  }
+
+  @Test
+  public void testWindowStats() {
+    FakeClock clock = new FakeClock();
+    WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+    ws.accumulate(1L);
+    assertEmpty(ws);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+
+    assertEquals(1L, ws.max());
+    assertEquals(1L, ws.min());
+    assertEquals(1L, ws.populationSize());
+    assertEquals(1L, ws.sum());
+    assertEquals(1.0, ws.mean(), 0.01);
+    assertEquals(0.0, ws.standardDeviation(), 0.01);
+
+    clock.advance(Amount.of(slices * sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    assertEmpty(ws);
+  }
+
+  @Test
+  public void testCleaningOfExpiredWindows() {
+    FakeClock clock = new FakeClock();
+    WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+    long n = 1000L;
+    for (int i=0; i<n; i++) {
+      ws.accumulate(i);
+    }
+    assertEmpty(ws);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    assertEquals(n, ws.populationSize()); // this window is not empty
+
+    clock.advance(Amount.of(100 * sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    assertEmpty(ws); // this window has been cleaned
+  }
+
+  @Test
+  public void testAddNewValueToFullWS() {
+    FakeClock clock = new FakeClock();
+    WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+    // AAAAA
+    //      BBBBB
+    //           CCCCC
+    //                DDDDD
+    //                |    |    |    |
+    //---------------------------------> t
+    //                t=0  t=1  t=2  t=3
+
+    // t=0 fill {D}
+    long n = 1000L;
+    for (int i=0; i<n; i++) {
+      ws.accumulate(i);
+    }
+    // read {A,B,C}, which should be empty
+    assertEmpty(ws);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    // t=1, read {B,C,D} which shouldn't be empty
+
+    assertEquals(n - 1L, ws.max());
+    assertEquals(0L, ws.min());
+    assertEquals(n, ws.populationSize());
+    assertEquals(n * (n - 1) / 2, ws.sum());
+    assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    // t=2, read {C,D,A} which shouldn't be empty as well
+
+    assertEquals(n - 1L, ws.max());
+    assertEquals(0L, ws.min());
+    assertEquals(n, ws.populationSize());
+    assertEquals(n * (n - 1) / 2, ws.sum());
+    assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    // t=3, read {D,A,B} which shouldn't be empty as well
+
+    assertEquals(n - 1L, ws.max());
+    assertEquals(0L, ws.min());
+    assertEquals(n, ws.populationSize());
+    assertEquals(n * (n - 1) / 2, ws.sum());
+    assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+    clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+    ws.refresh();
+    // t=4, read {A,B,C} which must be empty (cleaned by the Windowed class)
+    assertEmpty(ws);
+  }
+
+  private void assertEmpty(WindowedStatistics ws) {
+    assertEquals(Long.MIN_VALUE, ws.max());
+    assertEquals(Long.MAX_VALUE, ws.min());
+    assertEquals(0L, ws.populationSize());
+    assertEquals(0L, ws.sum());
+    assertEquals(0.0, ws.mean(), 0.01);
+    assertEquals(0.0, ws.standardDeviation(), 0.01);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
new file mode 100644
index 0000000..17526ea
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
@@ -0,0 +1,113 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+
+import org.junit.Test;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import junit.framework.Assert;
+
+/**
+ * Test the Windowed abstract class by making a very simple implementation.
+ */
+public class WindowedTest {
+
+  private class WindowedBox extends Windowed<Integer[]> {
+    WindowedBox(Amount<Long, Time > window, int slices, Clock clock) {
+      super(Integer[].class, window, slices,
+          new Supplier<Integer[]>() {
+            @Override public Integer[] get() {
+              Integer[] box = new Integer[1];
+              box[0] = 0;
+              return box;
+            }
+          },
+          new Function<Integer[], Integer[]>() {
+            @Override public Integer[] apply(Integer[] xs) {
+              xs[0] = 0;
+              return xs;
+            }
+          }, clock);
+    }
+
+    void increment() {
+      getCurrent()[0] += 1;
+    }
+
+    int sum() {
+      int s = 0;
+      for (Integer[] box : getTenured()) {
+        s += box[0];
+      }
+      return s;
+    }
+  }
+
+  @Test
+  public void testWindowed() {
+    Amount<Long, Time > window = Amount.of(1L, Time.MINUTES);
+    int slices = 3;
+    Amount<Long, Time > delta = Amount.of(
+        Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS) / 3, Time.NANOSECONDS);
+    FakeClock clock = new FakeClock();
+    WindowedBox win = new WindowedBox(window, slices, clock);
+    // [0][0][0][0]
+    clock.advance(Amount.of(1L, Time.NANOSECONDS));
+
+    win.increment();
+    // [0][0][0][1]
+    Assert.assertEquals(0, win.sum());
+
+    clock.advance(delta);
+    win.increment();
+    win.increment();
+    Assert.assertEquals(1, win.sum());
+    // [0][0][1][2]
+
+    clock.advance(delta);
+    win.increment();
+    win.increment();
+    win.increment();
+    Assert.assertEquals(3, win.sum());
+    // [0][1][2][3]
+
+    clock.advance(delta);
+    win.increment();
+    win.increment();
+    win.increment();
+    win.increment();
+    Assert.assertEquals(6, win.sum());
+    // [1][2][3][4]
+
+    clock.advance(delta);
+    win.increment();
+    win.increment();
+    win.increment();
+    win.increment();
+    win.increment();
+    Assert.assertEquals(9, win.sum());
+    // [2][3][4][5]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
new file mode 100644
index 0000000..56630fa
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
@@ -0,0 +1,49 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author John Sirois
+ */
+public class TearDownRegistryTest extends TearDownTestCase {
+
+  @Test
+  public void testTearDown() {
+    TearDownRegistry tearDownRegistry = new TearDownRegistry(this);
+    final AtomicBoolean actionExecuted = new AtomicBoolean(false);
+    tearDownRegistry.addAction(new Command() {
+      @Override public void execute() {
+        actionExecuted.set(true);
+      }
+    });
+
+    assertFalse(actionExecuted.get());
+    tearDown();
+    assertTrue(actionExecuted.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
new file mode 100644
index 0000000..8546935
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
@@ -0,0 +1,63 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.easymock;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author John Sirois
+ */
+public class EasyMockTestTest extends EasyMockTest {
+
+  @Test
+  public void testSimplyParametrizedMock() {
+    final AtomicBoolean ran = new AtomicBoolean(false);
+
+    Runnable runnable = createMock(new Clazz<Runnable>() { });
+    runnable.run();
+    expectLastCall().andAnswer(new IAnswer<Void>() {
+      @Override public Void answer() {
+        ran.set(true);
+        return null;
+      }
+    });
+    control.replay();
+
+    runnable.run();
+    assertTrue(ran.get());
+  }
+
+  @Test
+  public void testNestedParametrizedMock() {
+    List<List<String>> list = createMock(new Clazz<List<List<String>>>() { });
+    EasyMock.expect(list.get(0)).andReturn(ImmutableList.of("jake"));
+    control.replay();
+
+    assertEquals(ImmutableList.of("jake"), list.get(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
new file mode 100644
index 0000000..07a5aa9
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
@@ -0,0 +1,53 @@
+package com.twitter.common.testing.easymock;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+import static com.twitter.common.testing.easymock.IterableEquals.eqCollection;
+import static com.twitter.common.testing.easymock.IterableEquals.eqIterable;
+import static com.twitter.common.testing.easymock.IterableEquals.eqList;
+
+public class IterableEqualsTest extends EasyMockTest {
+  private static final List<Integer> TEST = ImmutableList.of(1, 2, 3, 2);
+  private static final String OK = "ok";
+  private Thing thing;
+
+  public interface Thing {
+    String testIterable(Iterable<Integer> input);
+    String testCollection(Collection<Integer> input);
+    String testList(List<Integer> input);
+  }
+
+  @Before
+  public void setUp() {
+    thing = createMock(Thing.class);
+  }
+
+  @Test
+  public void testIterableEquals() {
+    expect(thing.testIterable(eqIterable(TEST))).andReturn(OK);
+    control.replay();
+    thing.testIterable(ImmutableList.of(3, 2, 2, 1));
+  }
+
+  @Test
+  public void testCollectionEquals() {
+    expect(thing.testCollection(eqCollection(TEST))).andReturn(OK);
+    control.replay();
+    thing.testCollection(ImmutableList.of(3, 2, 2, 1));
+  }
+
+  @Test
+  public void testListEquals() {
+    expect(thing.testList(eqList(TEST))).andReturn(OK);
+    control.replay();
+    thing.testList(ImmutableList.of(3, 2, 2, 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
new file mode 100644
index 0000000..847347b
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
@@ -0,0 +1,233 @@
+// =================================================================================================
+// Copyright 2015 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.junit.rules;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.annotation.Nullable;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.MethodRule;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+
+// SUPPRESS CHECKSTYLE:OFF IllegalThrows
+public class RetryTest {
+
+  public abstract static class RetryTrackingTestBase {
+    private static int tries;
+
+    @BeforeClass
+    public static void resetTries() {
+      tries = 0;
+    }
+
+    enum Result {
+      FAILURE() {
+        @Override void execute() throws Throwable {
+          Assert.fail("Simulated assertion failure.");
+        }
+      },
+      ERROR() {
+        @Override void execute() throws Throwable {
+          throw new IOException("Simulated unexpected error.");
+        }
+      },
+      SUCCESS() {
+        @Override void execute() throws Throwable {
+          Assert.assertTrue("Simulated successful assertion.", true);
+        }
+      };
+
+      abstract void execute() throws Throwable;
+    }
+
+    @Rule public Retry.Rule retry = new Retry.Rule();
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    @interface AssertRetries {
+      int expectedTries();
+      int expectedMaxRetries();
+      Result expectedResult();
+    }
+
+    @Rule
+    public MethodRule testRetries = new MethodRule() {
+      @Override
+      public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) {
+        final AssertRetries assertRetries = method.getAnnotation(AssertRetries.class);
+        Assert.assertNotNull(assertRetries);
+        return new Statement() {
+          @Override public void evaluate() throws Throwable {
+            try {
+              statement.evaluate();
+              if (assertRetries.expectedResult() == Result.SUCCESS) {
+                Assert.assertEquals(assertRetries.expectedTries(), tries);
+              } else {
+                Assert.fail("Expected success, found " + assertRetries.expectedResult());
+              }
+            } catch (Retry.Rule.RetriedAssertionError e) {
+              if (assertRetries.expectedResult() == Result.FAILURE) {
+                Assert.assertEquals(assertRetries.expectedTries(), tries);
+                Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries());
+                Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber());
+              } else {
+                Assert.fail("Expected failure, found " + assertRetries.expectedResult());
+              }
+            } catch (Retry.Rule.RetriedException e) {
+              if (assertRetries.expectedResult() == Result.ERROR) {
+                Assert.assertEquals(assertRetries.expectedTries(), tries);
+                Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries());
+                Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber());
+              } else {
+                Assert.fail("Expected error, found " + assertRetries.expectedResult());
+              }
+            }
+          }
+        };
+      }
+    };
+
+    protected void doTest(int successfulTries) throws Throwable {
+      doTest(successfulTries, null);
+    }
+
+    protected void doTest(int successfulTries, @Nullable Result lastResult) throws Throwable {
+      tries++;
+      if (lastResult != null && tries > successfulTries) {
+        lastResult.execute();
+      }
+    }
+  }
+
+  public static class DefaultRetrySuccessTest extends RetryTrackingTestBase {
+    @Test
+    @Retry
+    @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.SUCCESS)
+    public void test() throws Throwable {
+      doTest(2);
+    }
+  }
+
+  public static class DefaultRetryFailFastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.FAILURE)
+    public void test() throws Throwable {
+      doTest(0, Result.FAILURE);
+    }
+  }
+
+  public static class DefaultRetryFailLastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry
+    @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.FAILURE)
+    public void test() throws Throwable {
+      doTest(1, Result.FAILURE);
+    }
+  }
+
+  public static class DefaultRetryErrorFastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.ERROR)
+    public void test() throws Throwable {
+      doTest(0, Result.ERROR);
+    }
+  }
+
+  public static class DefaultRetryErrorLastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry
+    @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.ERROR)
+    public void test() throws Throwable {
+      doTest(1, Result.ERROR);
+    }
+  }
+
+  public static class ZeroRetrySuccessTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 0)
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS)
+    public void test() throws Throwable {
+      doTest(1, Result.SUCCESS);
+    }
+  }
+
+  public static class NegativeRetrySuccessTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = -1)
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS)
+    public void test() throws Throwable {
+      doTest(1, Result.SUCCESS);
+    }
+  }
+
+  public static class PositiveRetrySuccessTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 2)
+    @AssertRetries(expectedTries = 3, expectedMaxRetries = 2, expectedResult = Result.SUCCESS)
+    public void test() throws Throwable {
+      doTest(3, Result.SUCCESS);
+    }
+  }
+
+  public static class PositiveRetryFailFastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 2)
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.FAILURE)
+    public void test() throws Throwable {
+      doTest(0, Result.FAILURE);
+    }
+  }
+
+  public static class PositiveRetryFailLastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 2)
+    @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.FAILURE)
+    public void test() throws Throwable {
+      doTest(1, Result.FAILURE);
+    }
+  }
+
+  public static class PositiveRetryErrorFastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 2)
+    @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.ERROR)
+    public void test() throws Throwable {
+      doTest(0, Result.ERROR);
+    }
+  }
+
+  public static class PositiveRetryErrorLastTest extends RetryTrackingTestBase {
+    @Test
+    @Retry(times = 2)
+    @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.ERROR)
+    public void test() throws Throwable {
+      doTest(1, Result.ERROR);
+    }
+  }
+}
+// SUPPRESS CHECKSTYLE:ON IllegalThrows

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
new file mode 100644
index 0000000..139d90e
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
@@ -0,0 +1,143 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.thrift.testing.MockTSocket;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftConnectionFactoryTest {
+
+  @Test
+  public void testPreconditions() {
+    try {
+      new ThriftConnectionFactory(null, 1, 1);
+      fail("a non-null host should be required");
+    } catch (NullPointerException e) {
+      // expected
+    }
+
+    try {
+      new ThriftConnectionFactory(" ", 1, 1);
+      fail("a non-blank host should be required");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      new ThriftConnectionFactory("localhost", 0, 1);
+      fail("a valid concrete remote port should be required");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      new ThriftConnectionFactory("localhost", 65536, 1);
+      fail("a valid port should be required");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+    try {
+      new ThriftConnectionFactory("localhost", 65535, 0);
+      fail("a non-zero value for maxConnections should be required");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testMaxConnections() throws TTransportException, IOException {
+    ThriftConnectionFactory thriftConnectionFactory = createConnectionFactory(2);
+
+    Connection<TTransport, InetSocketAddress> connection1 =
+        thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+    assertOpenConnection(connection1);
+
+    Connection<TTransport, InetSocketAddress> connection2 =
+        thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+    assertOpenConnection(connection2);
+    assertThat(connection1, not(sameInstance(connection2)));
+
+    assertNull("Should've reached maximum connections",
+        thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT));
+
+    thriftConnectionFactory.destroy(connection1);
+    assertClosedConnection(connection1);
+
+    Connection<TTransport, InetSocketAddress> connection3 =
+        thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+    assertOpenConnection(connection3);
+    @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
+    Matcher<Connection<TTransport, InetSocketAddress>> matcher =
+      allOf(not(sameInstance(connection1)), not(sameInstance(connection2)));
+    assertThat(connection3, matcher);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInactiveConnectionReturn() {
+    createConnectionFactory(1).destroy(new TTransportConnection(new MockTSocket(),
+        InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNullConnectionReturn() {
+    createConnectionFactory(1).destroy(null);
+  }
+
+  private void assertOpenConnection(Connection<TTransport, InetSocketAddress> connection) {
+    assertNotNull(connection);
+    assertTrue(connection.isValid());
+    assertTrue(connection.get().isOpen());
+  }
+
+  private void assertClosedConnection(Connection<TTransport, InetSocketAddress> connection) {
+    assertFalse(connection.isValid());
+    assertFalse(connection.get().isOpen());
+  }
+
+  private ThriftConnectionFactory createConnectionFactory(int maxConnections) {
+    return new ThriftConnectionFactory("foo", 1234, maxConnections) {
+      @Override TTransport createTransport(int timeoutMillis) throws TTransportException {
+        TTransport transport = new MockTSocket();
+        transport.open();
+        return transport;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
new file mode 100644
index 0000000..d1fcf3d
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
@@ -0,0 +1,245 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.async.TAsyncClient;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.thrift.ThriftFactoryTest.GoodService.AsyncIface;
+import com.twitter.thrift.ServiceInstance;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftFactoryTest extends TearDownTestCase {
+
+  private static final Logger LOG = Logger.getLogger(ThriftFactoryTest.class.getName());
+  private IMocksControl control;
+
+  static class GoodService {
+    public interface Iface {
+      String doWork() throws TResourceExhaustedException;
+    }
+
+    public interface AsyncIface {
+      void doWork(AsyncMethodCallback<String> callback);
+    }
+
+    public static final String DONE = "done";
+
+    public static class Client implements Iface {
+      public Client(TProtocol protocol) {
+        assertNotNull(protocol);
+      }
+
+      @Override public String doWork() throws TResourceExhaustedException {
+        return DONE;
+      }
+    }
+
+    public static class AsyncClient extends TAsyncClient implements AsyncIface {
+      public AsyncClient(TProtocolFactory factory, TAsyncClientManager manager,
+          TNonblockingTransport transport) {
+        super(factory, manager, transport);
+        assertNotNull(factory);
+        assertNotNull(manager);
+        assertNotNull(transport);
+      }
+
+      @Override public void doWork(AsyncMethodCallback<String> callback) {
+        callback.onComplete(DONE);
+      }
+    }
+  }
+
+  static class BadService {
+    public interface Iface {
+      void doWork();
+    }
+    public interface AsyncIface {
+      void doWork(AsyncMethodCallback<Void> callback);
+    }
+
+    public static class Client implements Iface {
+      @Override public void doWork() {
+        throw new UnsupportedOperationException();
+      }
+    }
+  }
+
+  private ImmutableSet<InetSocketAddress> endpoints;
+
+  @Before
+  public void setUp() throws Exception {
+    control = EasyMock.createControl();
+    endpoints = ImmutableSet.of(new InetSocketAddress(5555));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullServiceInterface() {
+    ThriftFactory.create(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadServiceInterface() {
+    ThriftFactory.create(GoodService.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadServiceImpl() throws ThriftFactory.ThriftFactoryException {
+    ThriftFactory.<BadService.Iface>create(BadService.Iface.class)
+        .build(endpoints);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadAsyncServiceImpl() throws ThriftFactory.ThriftFactoryException {
+    ThriftFactory.<BadService.AsyncIface>create(BadService.AsyncIface.class)
+        .useFramedTransport(true)
+        .buildAsync(endpoints);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoBackends() {
+    ThriftFactory.create(GoodService.Iface.class)
+        .build(ImmutableSet.<InetSocketAddress>of());
+  }
+
+  @Test
+  public void testCreate() throws Exception {
+    final AtomicReference<Socket> clientConnection = new AtomicReference<Socket>();
+    final CountDownLatch connected = new CountDownLatch(1);
+    final ServerSocket server = new ServerSocket(0);
+    Thread service = new Thread(new Runnable() {
+      @Override public void run() {
+        try {
+          clientConnection.set(server.accept());
+        } catch (IOException e) {
+          LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e);
+        } finally {
+          connected.countDown();
+        }
+      }
+    });
+    service.setDaemon(true);
+    service.start();
+
+    try {
+      final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class)
+          .withMaxConnectionsPerEndpoint(1)
+          .build(ImmutableSet.of(new InetSocketAddress(server.getLocalPort())));
+      addTearDown(new TearDown() {
+        @Override public void tearDown() {
+          thrift.close();
+        }
+      });
+
+      GoodService.Iface client = thrift.create();
+
+      assertEquals(GoodService.DONE, client.doWork());
+    } finally {
+      connected.await();
+      server.close();
+    }
+
+    Socket socket = clientConnection.get();
+    assertNotNull(socket);
+    socket.close();
+  }
+
+  @Test(expected = TResourceExhaustedException.class)
+  public void testCreateEmpty() throws Exception {
+    @SuppressWarnings("unchecked")
+    DynamicHostSet<ServiceInstance> emptyHostSet = control.createMock(DynamicHostSet.class);
+    final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class)
+        .withMaxConnectionsPerEndpoint(1)
+        .build(emptyHostSet);
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        thrift.close();
+      }
+    });
+    GoodService.Iface client = thrift.create();
+
+    // This should throw a TResourceExhaustedException
+    client.doWork();
+  }
+
+  @Test
+  public void testCreateAsync()
+      throws IOException, InterruptedException, ThriftFactory.ThriftFactoryException {
+    final String responseHolder[] = new String[] {null};
+    final CountDownLatch done = new CountDownLatch(1);
+    AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
+      @Override
+      public void onComplete(String response) {
+        responseHolder[0] = response;
+        done.countDown();
+      }
+
+      @Override
+      public void onError(Exception throwable) {
+        responseHolder[0] = throwable.toString();
+        done.countDown();
+      }
+    };
+
+    final Thrift<AsyncIface> thrift = ThriftFactory.create(GoodService.AsyncIface.class)
+        .withMaxConnectionsPerEndpoint(1)
+        .useFramedTransport(true)
+        .buildAsync(ImmutableSet.of(new InetSocketAddress(1234)));
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        thrift.close();
+      }
+    });
+    GoodService.AsyncIface client = thrift.builder()
+        .blocking()
+        .create();
+
+    client.doWork(callback);
+    assertTrue("wasn't called back in time, callback got " + responseHolder[0],
+        done.await(5000, TimeUnit.MILLISECONDS));
+    assertEquals(GoodService.DONE, responseHolder[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
new file mode 100644
index 0000000..eea6b5b
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
@@ -0,0 +1,934 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Function;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.net.loadbalancing.LoadBalancer;
+import com.twitter.common.net.loadbalancing.RequestTracker;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.net.pool.ResourceExhaustedException;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.thrift.callers.RetryingCaller;
+import com.twitter.common.thrift.testing.MockTSocket;
+import com.twitter.common.util.concurrent.ForwardingExecutorService;
+
+import static org.easymock.EasyMock.and;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftTest {
+
+  private static final Amount<Long, Time> ASYNC_CONNECT_TIMEOUT = Amount.of(1L, Time.SECONDS);
+
+  public static class NotFoundException extends Exception {}
+
+  public interface TestService {
+    int calculateMass(String profileName) throws NotFoundException, TException;
+  }
+
+  public interface TestServiceAsync {
+    void calculateMass(String profileName, AsyncMethodCallback callback) throws TException;
+  }
+
+  private IMocksControl control;
+  private ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
+  private Function<TTransport, TestService> clientFactory;
+  private Function<TTransport, TestServiceAsync> asyncClientFactory;
+  private RequestTracker<InetSocketAddress> requestTracker;
+
+  private AsyncMethodCallback<Integer> callback;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    control = EasyMock.createControl();
+
+    this.connectionPool = control.createMock(ObjectPool.class);
+    this.clientFactory = control.createMock(Function.class);
+    this.asyncClientFactory = control.createMock(Function.class);
+    this.requestTracker = control.createMock(LoadBalancer.class);
+
+    this.callback = control.createMock(AsyncMethodCallback.class);
+  }
+
+  @After
+  public void after() {
+    Stats.flush();
+  }
+
+  @Test
+  public void testDoCallNoDeadline() throws Exception {
+    TestService testService = expectServiceCall(false);
+    expect(testService.calculateMass("jake")).andReturn(42);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    int userMass = thrift.builder().blocking().create().calculateMass("jake");
+
+    assertEquals(42, userMass);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testDoCallAsync() throws Exception {
+    // Capture the callback that Thift has wrapped around our callback.
+    Capture<AsyncMethodCallback<Integer>> callbackCapture =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    // Verifies that our callback was called.
+    callback.onComplete(42);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    // Mimicks the async response from the server.
+    callbackCapture.getValue().onComplete(42);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testDoCallServiceException() throws Exception {
+    TestService testService = expectServiceCall(true);
+    NotFoundException notFoundException = new NotFoundException();
+    expect(testService.calculateMass("jake")).andThrow(notFoundException);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    try {
+      thrift.builder().blocking().create().calculateMass("jake");
+      fail("Expected service custom exception to bubble unmodified");
+    } catch (NotFoundException e) {
+      assertSame(notFoundException, e);
+    }
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testDoCallAsyncServiceException() throws Exception {
+    NotFoundException notFoundException = new NotFoundException();
+
+    // Capture the callback that Thift has wrapped around our callback.
+    Capture<AsyncMethodCallback<Integer>> callbackCapture =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // Verifies that our callback was called.
+    callback.onError(notFoundException);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    // Mimicks the async response from the server.
+    callbackCapture.getValue().onError(notFoundException);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testDoCallThriftException() throws Exception {
+    Capture<TTransport> transportCapture = new Capture<TTransport>();
+    TestService testService = expectThriftError(transportCapture);
+    TTransportException tException = new TTransportException();
+    expect(testService.calculateMass("jake")).andThrow(tException);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    try {
+      thrift.builder().blocking().create().calculateMass("jake");
+      fail("Expected thrift exception to bubble unmodified");
+    } catch (TException e) {
+      assertSame(tException, e);
+    }
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    assertTrue(transportCapture.hasCaptured());
+    assertFalse("Expected the transport to be forcibly closed when a thrift error is encountered",
+        transportCapture.getValue().isOpen());
+
+    control.verify();
+  }
+
+  @Test
+  public void doCallAsyncThriftException() throws Exception {
+    TTransportException tException = new TTransportException();
+
+    expectAsyncServiceCall(true).calculateMass(eq("jake"), (AsyncMethodCallback) anyObject());
+    expectLastCall().andThrow(tException);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    callback.onError(tException);
+
+    control.replay();
+
+    thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDisallowsAsyncWithDeadline() {
+    Config config = Config.builder()
+        .withRequestTimeout(Amount.of(1L, Time.SECONDS))
+        .create();
+
+    new Thrift<TestServiceAsync>(config, connectionPool, requestTracker,
+      "foo", TestServiceAsync.class, asyncClientFactory, true, false).create();
+  }
+
+  @Test
+  public void testDoCallDeadlineMet() throws Exception {
+    TestService testService = expectServiceCall(false);
+    expect(testService.calculateMass("jake")).andReturn(42);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Thrift<TestService> thrift = createThrift(executorService);
+
+    control.replay();
+
+    int userMass = thrift.builder().withRequestTimeout(Amount.of(1L, Time.DAYS)).create()
+        .calculateMass("jake");
+
+    assertEquals(42, userMass);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  @Ignore("Flaky: https://trac.twitter.com/twttr/ticket/11474")
+  public void testDoCallDeadlineExpired() throws Exception {
+    TestService testService = expectServiceCall(true);
+
+    // Setup a way to verify the callable was cancelled by Thrift when timeout elapsed
+    final CountDownLatch remoteCallComplete = new CountDownLatch(1);
+    final CountDownLatch remoteCallStarted = new CountDownLatch(1);
+    final Command verifyCancelled = control.createMock(Command.class);
+    verifyCancelled.execute();
+    final Object block = new Object();
+    expect(testService.calculateMass("jake")).andAnswer(new IAnswer<Integer>() {
+      @Override public Integer answer() throws TException {
+        try {
+          synchronized (block) {
+            remoteCallStarted.countDown();
+            block.wait();
+          }
+          fail("Expected late work to be cancelled and interrupted");
+        } catch (InterruptedException e) {
+          verifyCancelled.execute();
+        } finally {
+          remoteCallComplete.countDown();
+        }
+        throw new TTransportException();
+      }
+    });
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.TIMEOUT), anyLong());
+
+    ExecutorService executorService =
+        new ForwardingExecutorService<ExecutorService>(Executors.newSingleThreadExecutor()) {
+          @Override public <T> Future<T> submit(Callable<T> task) {
+            Future<T> future = super.submit(task);
+
+            // make sure the task is started so we can verify it gets cancelled
+            try {
+              remoteCallStarted.await();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+
+            return future;
+          }
+        };
+    Thrift<TestService> thrift = createThrift(executorService);
+
+    control.replay();
+
+    try {
+      thrift.builder().withRequestTimeout(Amount.of(1L, Time.NANOSECONDS)).create()
+          .calculateMass("jake");
+      fail("Expected a timeout");
+    } catch (TTimeoutException e) {
+      // expected
+    } finally {
+      remoteCallComplete.await();
+    }
+
+    assertRequestsTotal(thrift, 0);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 1);
+
+    control.verify();
+  }
+
+  @Test
+  public void testRetriesNoProblems() throws Exception {
+    expect(expectServiceCall(false).calculateMass("jake")).andReturn(42);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService = thrift.builder().blocking().withRetries(1).create();
+
+    assertEquals(42, testService.calculateMass("jake"));
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncRetriesNoProblems() throws Exception {
+    // Capture the callback that Thift has wrapped around our callback.
+    Capture<AsyncMethodCallback<Integer>> callbackCapture =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    // Verifies that our callback was called.
+    callback.onComplete(42);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    // Mimicks the async response from the server.
+    callbackCapture.getValue().onComplete(42);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testRetriesRecover() throws Exception {
+    // 1st call
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 1st retry recovers
+    expect(expectServiceCall(false).calculateMass("jake")).andReturn(42);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService = thrift.builder().blocking().withRetries(1).create();
+
+    assertEquals(42, testService.calculateMass("jake"));
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncRetriesRecover() throws Exception {
+    // Capture the callback that Thift has wrapped around our callback.
+    Capture<AsyncMethodCallback<Integer>> callbackCapture =
+        new Capture<AsyncMethodCallback<Integer>>();
+
+    // 1st call
+    expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture));
+    expectLastCall().andThrow(new TTransportException());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 1st retry recovers
+    expectAsyncServiceRetry(false).calculateMass(eq("jake"), capture(callbackCapture));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+    // Verifies that our callback was called.
+    callback.onComplete(42);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    // Mimicks the async response from the server.
+    callbackCapture.getValue().onComplete(42);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 0);
+    assertReconnectsTotal(thrift, 0);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testRetriesFailure() throws Exception {
+    // 1st call
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 1st retry
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 2nd retry
+    TTransportException finalRetryException = new TTransportException();
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(finalRetryException);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService = thrift.builder().blocking().withRetries(2).create();
+
+    try {
+      testService.calculateMass("jake");
+      fail("Expected an exception to be thrown since all retires failed");
+    } catch (TException e) {
+      assertSame(finalRetryException, e);
+    }
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncRetriesFailure() throws Exception {
+    // 1st call
+    Capture<AsyncMethodCallback<Integer>> callbackCapture1 =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 1st retry
+    Capture<AsyncMethodCallback<Integer>> callbackCapture2 =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // 2nd retry
+    Capture<AsyncMethodCallback<Integer>> callbackCapture3 =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture3));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // Verifies that our callback was called.
+    TTransportException returnedException = new TTransportException();
+    callback.onError(returnedException);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    thrift.builder().withRetries(2).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    callbackCapture1.getValue().onError(new TTransportException());
+    callbackCapture2.getValue().onError(new IOException());
+    callbackCapture3.getValue().onError(returnedException);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testRetrySelection() throws Exception {
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(new NotFoundException());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // verify subclasses pass the retry filter
+    class HopelesslyLost extends NotFoundException {}
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(new HopelesslyLost());
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    TTransportException nonRetryableException = new TTransportException();
+    expect(expectServiceCall(true).calculateMass("jake")).andThrow(nonRetryableException);
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService =
+        thrift.builder().blocking().withRetries(2).retryOn(NotFoundException.class).create();
+
+    try {
+      testService.calculateMass("jake");
+      fail("Expected n exception to be thrown since all retires failed");
+    } catch (TException e) {
+      assertSame(nonRetryableException, e);
+    }
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncRetrySelection() throws Exception {
+    // verify subclasses pass the retry filter
+    class HopelesslyLost extends NotFoundException {}
+    Capture<AsyncMethodCallback<Integer>> callbackCapture1 =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    Capture<AsyncMethodCallback<Integer>> callbackCapture2 =
+        new Capture<AsyncMethodCallback<Integer>>();
+    expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2));
+    requestTracker.requestResult(
+        (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+    // Verifies that our callback was called.
+    TTransportException nonRetryableException = new TTransportException();
+    callback.onError(nonRetryableException);
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestServiceAsync testService = thrift.builder()
+        .withRetries(2)
+        .retryOn(NotFoundException.class)
+        .withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create();
+
+    testService.calculateMass("jake", callback);
+    callbackCapture1.getValue().onError(new HopelesslyLost());
+    callbackCapture2.getValue().onError(nonRetryableException);
+
+    assertRequestsTotal(thrift, 1);
+    assertErrorsTotal(thrift, 1);
+    assertReconnectsTotal(thrift, 1);
+    assertTimeoutsTotal(thrift, 0);
+
+    control.verify();
+  }
+
+  @Test
+  public void testResourceExhausted() throws Exception {
+    expectConnectionPoolResourceExhausted(Config.DEFAULT_CONNECT_TIMEOUT);
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService = thrift.builder().blocking().create();
+
+    try {
+      testService.calculateMass("jake");
+      fail("Expected a TResourceExhaustedException.");
+    } catch (TResourceExhaustedException e) {
+      // Expected
+    }
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncResourceExhausted() throws Exception {
+    expectConnectionPoolResourceExhausted(ASYNC_CONNECT_TIMEOUT);
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    callback.onError(and(anyObject(), isA(TResourceExhaustedException.class)));
+
+    control.replay();
+
+    TestServiceAsync testService = thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT)
+        .create();
+
+    testService.calculateMass("jake", callback);
+
+    control.verify();
+  }
+
+  @Test
+  public void testAsyncDoesNotRetryResourceExhausted() throws Exception {
+    expect(connectionPool.get(ASYNC_CONNECT_TIMEOUT)).andThrow(
+        new ResourceExhaustedException("first"));
+
+    Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+    callback.onError(and(anyObject(), isA(TResourceExhaustedException.class)));
+
+    control.replay();
+
+    thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+        .calculateMass("jake", callback);
+
+    control.verify();
+  }
+
+  @Test
+  public void testConnectionPoolTimeout() throws Exception {
+    expectConnectionPoolTimeout(Config.DEFAULT_CONNECT_TIMEOUT);
+    Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+    control.replay();
+
+    TestService testService =
+        thrift.builder().blocking().create();
+
+    try {
+      testService.calculateMass("jake");
+      fail("Expected a TTimeoutException.");
+    } catch (TTimeoutException e) {
+      // Expected
+    }
+
+    control.verify();
+  }
+
+  @Test
+  public void testDoCallDeadlineNoThreads() throws Exception {
+    control.replay();
+
+    ExecutorService executorService =
+        new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
+
+    Thrift<TestService> thrift = createThrift(executorService);
+
+    final TestService service =
+          thrift.builder().noRetries().withRequestTimeout(Amount.of(1L, Time.SECONDS)).create();
+
+    final CountDownLatch remoteCallComplete = new CountDownLatch(1);
+    final CountDownLatch remoteCallStarted = new CountDownLatch(1);
+
+    Future<Integer> result = executorService.submit(new Callable<Integer>() {
+      @Override public Integer call() throws Exception {
+        remoteCallStarted.countDown();
+        remoteCallComplete.await();
+        return service.calculateMass("jake");
+      }
+    });
+
+    remoteCallStarted.await();
+    try {
+      service.calculateMass("jake");
+      fail("Expected no available threads to trigger resource exhausted");
+    } catch (TResourceExhaustedException e) {
+      // expected
+    } finally {
+      remoteCallComplete.countDown();
+    }
+
+    try {
+      result.get();
+      fail("Expected no available threads to trigger resource exhausted");
+    } catch (ExecutionException e) {
+      assertEquals(TResourceExhaustedException.class, e.getCause().getClass());
+    }
+
+    control.verify();
+  }
+
+  private ExecutorService expectUnusedExecutorService() {
+    return control.createMock(ExecutorService.class);
+  }
+
+  private static final String STAT_REQUESTS = "requests_events";
+  private static final String STAT_ERRORS = "errors";
+  private static final String STAT_RECONNECTS = "reconnects";
+  private static final String STAT_TIMEOUTS = "timeouts";
+
+  private void assertRequestsTotal(Thrift<?> thrift, int total) {
+    assertRequestStatValue(STAT_REQUESTS, total);
+  }
+
+  private void assertErrorsTotal(Thrift<?> thrift, int total) {
+    assertRequestStatValue(STAT_ERRORS, total);
+  }
+
+  private void assertReconnectsTotal(Thrift<?> thrift, int total) {
+    assertRequestStatValue(STAT_RECONNECTS, total);
+  }
+
+  private void assertTimeoutsTotal(Thrift<?> thrift, int total) {
+    assertRequestStatValue(STAT_TIMEOUTS, total);
+  }
+
+  private void assertRequestStatValue(String statName, long expectedValue) {
+
+    Stat<Long> var = Stats.getVariable("foo_calculateMass_" + statName);
+
+    assertNotNull(var);
+    assertEquals(expectedValue, (long) var.read());
+  }
+
+  private Thrift<TestService> createThrift(ExecutorService executorService) {
+    return new Thrift<TestService>(executorService, connectionPool, requestTracker, "foo",
+        TestService.class, clientFactory, false, false);
+  }
+
+  private Thrift<TestServiceAsync> createAsyncThrift(ExecutorService executorService) {
+    return new Thrift<TestServiceAsync>(executorService, connectionPool, requestTracker, "foo",
+        TestServiceAsync.class, asyncClientFactory, true, false);
+  }
+
+  private TestService expectServiceCall(boolean withFailure)
+      throws ResourceExhaustedException, TimeoutException {
+    Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet();
+    return expectServiceCall(connection, withFailure);
+  }
+
+  private TestServiceAsync expectAsyncServiceCall(boolean withFailure)
+      throws ResourceExhaustedException, TimeoutException {
+    return expectAsyncServiceCall(expectConnectionPoolGet(ASYNC_CONNECT_TIMEOUT), withFailure);
+  }
+
+  private TestServiceAsync expectAsyncServiceRetry(boolean withFailure)
+      throws ResourceExhaustedException, TimeoutException {
+    return expectAsyncServiceCall(
+        expectConnectionPoolGet(RetryingCaller.NONBLOCKING_TIMEOUT), withFailure);
+  }
+
+  private TestService expectThriftError(Capture<TTransport> transportCapture)
+      throws ResourceExhaustedException, TimeoutException {
+    Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet();
+    return expectServiceCall(connection, transportCapture, true);
+  }
+
+  private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet()
+      throws ResourceExhaustedException, TimeoutException {
+    Connection<TTransport, InetSocketAddress> connection = createConnection();
+    expect(connectionPool.get(Config.DEFAULT_CONNECT_TIMEOUT)).andReturn(connection);
+    return connection;
+  }
+
+  private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet(
+      Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
+    Connection<TTransport, InetSocketAddress> connection = createConnection();
+    expect(connectionPool.get(timeout)).andReturn(connection);
+    return connection;
+  }
+
+  private void expectConnectionPoolResourceExhausted(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+    expect(connectionPool.get(timeout)).andThrow(new ResourceExhaustedException(""));
+  }
+
+  private void expectConnectionPoolTimeout(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+    expect(connectionPool.get(timeout)).andThrow(new TimeoutException());
+  }
+
+  private Connection<TTransport, InetSocketAddress> createConnection() {
+    return new TTransportConnection(new MockTSocket(),
+        InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT));
+  }
+
+  private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection,
+      boolean withFailure) {
+    return expectServiceCall(connection, null, withFailure);
+  }
+
+  private TestServiceAsync expectAsyncServiceCall(
+      Connection<TTransport, InetSocketAddress> connection, boolean withFailure) {
+    return expectAsyncServiceCall(connection, null, withFailure);
+  }
+
+  private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection,
+      Capture<TTransport> transportCapture, boolean withFailure) {
+
+    TestService testService = control.createMock(TestService.class);
+    if (connection != null) {
+      IExpectationSetters<TestService> expectApply = transportCapture == null
+          ? expect(clientFactory.apply(EasyMock.isA(TTransport.class)))
+          : expect(clientFactory.apply(EasyMock.capture(transportCapture)));
+      expectApply.andReturn(testService);
+
+      if (withFailure) {
+        connectionPool.remove(connection);
+      } else {
+        connectionPool.release(connection);
+      }
+    }
+    return testService;
+  }
+
+  private TestServiceAsync expectAsyncServiceCall(
+      Connection<TTransport, InetSocketAddress> connection,
+      Capture<TTransport> transportCapture, boolean withFailure) {
+
+    TestServiceAsync testService = control.createMock(TestServiceAsync.class);
+    if (connection != null) {
+      IExpectationSetters<TestServiceAsync> expectApply = transportCapture == null
+          ? expect(asyncClientFactory.apply(EasyMock.isA(TTransport.class)))
+          : expect(asyncClientFactory.apply(EasyMock.capture(transportCapture)));
+      expectApply.andReturn(testService);
+
+      if (withFailure) {
+        connectionPool.remove(connection);
+      } else {
+        connectionPool.release(connection);
+      }
+    }
+    return testService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
new file mode 100644
index 0000000..cf55afe
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
@@ -0,0 +1,59 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift.callers;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import org.junit.Before;
+
+import java.lang.reflect.Method;
+
+import static org.easymock.EasyMock.expect;
+
+/**
+ * Test framework for testing callers.
+ *
+ * @author William Farner
+ */
+public abstract class AbstractCallerTest extends EasyMockTest {
+  protected final Amount<Long, Time> CONNECT_TIMEOUT = Amount.of(1L, Time.HOURS);
+
+  protected Caller caller;
+
+  protected Method methodA;
+  protected Object[] argsA;
+
+  @Before
+  public final void callerSetUp() throws Exception {
+    caller = createMock(Caller.class);
+    methodA = Object.class.getMethod("toString");
+    argsA = new Object[] {};
+  }
+
+  protected String call(Caller caller) throws Throwable {
+    return (String) caller.call(methodA, argsA, null, CONNECT_TIMEOUT);
+  }
+
+  protected void expectCall(String returnValue) throws Throwable {
+    expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andReturn(returnValue);
+  }
+
+  protected void expectCall(Throwable thrown) throws Throwable {
+    expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andThrow(thrown);
+  }
+}