You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2015/08/25 20:19:18 UTC
[04/37] aurora git commit: Import of Twitter Commons.
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
new file mode 100644
index 0000000..45faf5d
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java
@@ -0,0 +1,237 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+
+import com.twitter.common.objectsize.ObjectSizeCalculator;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.testing.RealHistogram;
+import com.twitter.common.util.testing.FakeClock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static com.twitter.common.stats.WindowedApproxHistogram.DEFAULT_MAX_MEMORY;
+
+/**
+ * Tests WindowedHistogram.
+ */
+public class WindowedHistogramTest {
+
+ @Test
+ public void testEmptyWinHistogram() {
+ WindowedApproxHistogram wh = new WindowedApproxHistogram();
+ assertEquals(0L, wh.getQuantile(0.0));
+ }
+
+ @Test
+ public void testWinHistogramWithEdgeCases() {
+ FakeClock clock = new FakeClock();
+ Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+ int slices = 10;
+ long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+ WindowedApproxHistogram h =
+ new WindowedApproxHistogram(window, slices, DEFAULT_MAX_MEMORY, clock);
+
+ h.add(Long.MIN_VALUE);
+ clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS));
+ assertEquals(Long.MIN_VALUE, h.getQuantile(0.0));
+ assertEquals(Long.MIN_VALUE, h.getQuantile(0.5));
+ assertEquals(Long.MIN_VALUE, h.getQuantile(1.0));
+
+ h.add(Long.MAX_VALUE);
+ clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS));
+ assertEquals(Long.MIN_VALUE, h.getQuantile(0.0));
+ assertEquals(Long.MIN_VALUE, h.getQuantile(0.25));
+ assertEquals(Long.MAX_VALUE, h.getQuantile(0.75));
+ assertEquals(Long.MAX_VALUE, h.getQuantile(1.0));
+ }
+
+ @Test
+ public void testClearedWinHistogram() {
+ FakeClock clock = new FakeClock();
+ Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+ int slices = 10;
+ Amount<Long, Time> sliceDuration = Amount.of(
+ window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS);
+ WindowedHistogram<?> h = createFullHistogram(window, slices, clock);
+ long p0 = h.getQuantile(0.1);
+ long p50 = h.getQuantile(0.5);
+ long p90 = h.getQuantile(0.9);
+ assertFalse(0 == p0);
+ assertFalse(0 == p50);
+ assertFalse(0 == p90);
+
+ h.clear();
+
+ assertEquals(0, h.getQuantile(0.1));
+ assertEquals(0, h.getQuantile(0.5));
+ assertEquals(0, h.getQuantile(0.9));
+
+ // reload the histogram with the exact same values than before
+ fillHistogram(h, sliceDuration, slices, clock);
+
+ assertEquals(p0, h.getQuantile(0.1));
+ assertEquals(p50, h.getQuantile(0.5));
+ assertEquals(p90, h.getQuantile(0.9));
+ }
+
+ @Test
+ public void testSimpleWinHistogram() {
+ FakeClock clock = new FakeClock();
+ Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+ int slices = 10;
+ WindowedHistogram<?> wh = createFullHistogram(window, slices, clock);
+
+ // check that the global distribution is the aggregation of all underlying histograms
+ for (int i = 1; i <= slices; i++) {
+ double q = (double) i / slices;
+ assertEquals(i, wh.getQuantile(q), 1.0);
+ }
+
+ // advance in time an forget about old values
+ long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+ clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+ for (int j = 0; j < 1000; j++) {
+ wh.add(11);
+ }
+ assertEquals(2, wh.getQuantile(0.05), 1.0);
+ assertEquals(11, wh.getQuantile(0.99), 1.0);
+ }
+
+ @Test
+ public void testWinHistogramWithGap() {
+ FakeClock clock = new FakeClock();
+ Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+ int slices = 10;
+ WindowedHistogram<?> wh = createFullHistogram(window, slices, clock);
+ // wh is a WindowedHistogram of 10 slices + the empty current with values from 1 to 10
+ // [1][2][3][4][5][6][7][8][9][10][.]
+ // ^
+
+ for (int j = 0; j < 1000; j++) {
+ wh.add(100);
+ }
+ // [1][2][3][4][5][6][7][8][9][10][100]
+ // ^
+ // quantiles are computed based on [1] -> [10]
+
+ clock.advance(Amount.of((slices - 1) * 100L / slices, Time.MILLISECONDS));
+ for (int j = 0; j < 1000; j++) {
+ wh.add(200);
+ }
+ // [1][2][3][4][5][6][7][8][200][10][100]
+ // ^
+ // quantiles are computed based on [10][100][1][2][3][4][5][6][7][8]
+ // and removing old ones [10][100][.][.][.][.][.][.][.][.]
+ // all the histograms between 100 and 200 are old and shouldn't matter in the computation of
+ // quantiles.
+ assertEquals(10L, wh.getQuantile(0.25), 1.0);
+ assertEquals(100L, wh.getQuantile(0.75), 1.0);
+
+ clock.advance(Amount.of(100L / slices, Time.MILLISECONDS));
+ // [1][2][3][4][5][6][7][8][200][10][100]
+ // ^
+ // quantiles are computed based on [100][1][2][3][4][5][6][7][8][200]
+ // and removing old ones [100][.][.][.][.][.][.][.][.][200]
+
+ assertEquals(100L, wh.getQuantile(0.25), 1.0);
+ assertEquals(200L, wh.getQuantile(0.75), 1.0);
+
+ // advance a lot in time, everything should be "forgotten"
+ clock.advance(Amount.of(500L, Time.MILLISECONDS));
+ assertEquals(0L, wh.getQuantile(0.5), 1.0);
+ }
+
+ @Test
+ public void testWinHistogramMemory() {
+ ImmutableList.Builder<Amount<Long, Data>> builder = ImmutableList.builder();
+ builder.add(Amount.of(8L, Data.KB));
+ builder.add(Amount.of(12L, Data.KB));
+ builder.add(Amount.of(16L, Data.KB));
+ builder.add(Amount.of(20L, Data.KB));
+ builder.add(Amount.of(24L, Data.KB));
+ builder.add(Amount.of(32L, Data.KB));
+ builder.add(Amount.of(64L, Data.KB));
+ builder.add(Amount.of(256L, Data.KB));
+ builder.add(Amount.of(1L, Data.MB));
+ builder.add(Amount.of(16L, Data.MB));
+ builder.add(Amount.of(32L, Data.MB));
+ List<Amount<Long, Data>> sizes = builder.build();
+
+ // large estimation of the memory used outside of buffers
+ long fixSize = Amount.of(4, Data.KB).as(Data.BYTES);
+
+ for (Amount<Long, Data> maxSize: sizes) {
+ WindowedApproxHistogram hist = new WindowedApproxHistogram(
+ Amount.of(60L, Time.SECONDS), 6, maxSize);
+ hist.add(1L);
+ hist.getQuantile(0.5);
+ long size = ObjectSizeCalculator.getObjectSize(hist);
+ // reverting CI JVM seems to have different memory consumption than mine
+ //assertTrue(size < fixSize + maxSize.as(Data.BYTES));
+ }
+ }
+
+ @Test
+ public void testWinHistogramAccuracy() {
+ FakeClock ticker = new FakeClock();
+ Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS);
+ int slices = 10;
+ Amount<Long, Time> sliceDuration = Amount.of(
+ window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS);
+ WindowedHistogram<?> wh = createFullHistogram(window, slices, ticker);
+ RealHistogram rh = fillHistogram(new RealHistogram(), sliceDuration, slices, new FakeClock());
+
+ assertEquals(wh.getQuantile(0.5), rh.getQuantile(0.5));
+ assertEquals(wh.getQuantile(0.75), rh.getQuantile(0.75));
+ assertEquals(wh.getQuantile(0.9), rh.getQuantile(0.9));
+ assertEquals(wh.getQuantile(0.99), rh.getQuantile(0.99));
+ }
+
+ /**
+ * @return a WindowedHistogram with different value in each underlying Histogram
+ */
+ private WindowedHistogram<?> createFullHistogram(
+ Amount<Long, Time> duration, int slices, FakeClock clock) {
+ long sliceDuration = duration.as(Time.NANOSECONDS) / slices;
+ WindowedApproxHistogram wh = new WindowedApproxHistogram(duration, slices,
+ DEFAULT_MAX_MEMORY, clock);
+ clock.advance(Amount.of(1L, Time.NANOSECONDS));
+
+ return fillHistogram(wh, Amount.of(sliceDuration, Time.NANOSECONDS), slices, clock);
+ }
+
+ private <H extends Histogram> H fillHistogram(H h,
+ Amount<Long, Time> sliceDuration, int slices, FakeClock clock) {
+ for (int i = 1; i <= slices; i++) {
+ for (int j = 0; j < 1000; j++) {
+ h.add(i);
+ }
+ clock.advance(sliceDuration);
+ }
+ return h;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
new file mode 100644
index 0000000..ed821ea
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java
@@ -0,0 +1,189 @@
+package com.twitter.common.stats;
+
+import org.junit.Test;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.WindowedStatistics;
+import com.twitter.common.util.testing.FakeClock;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStatsTest {
+ private Amount<Long, Time> window = Amount.of(1L, Time.MINUTES);
+ private int slices = 3;
+ private long sliceDuration = window.as(Time.NANOSECONDS) / slices;
+
+ @Test
+ public void testEmptyStats() {
+ FakeClock clock = new FakeClock();
+ WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+ assertEmpty(ws);
+ }
+
+ @Test
+ public void testStatsCorrectness() {
+ FakeClock clock = new FakeClock();
+ Statistics reference = new Statistics();
+ WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+ for (int i=0; i<1000; i++) {
+ reference.accumulate(i);
+ ws.accumulate(i);
+ }
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+
+ assertEquals(reference.max(), ws.max());
+ assertEquals(reference.min(), ws.min());
+ assertEquals(reference.populationSize(), ws.populationSize());
+ assertEquals(reference.sum(), ws.sum());
+ assertEquals(reference.range(), ws.range());
+ assertEquals(reference.mean(), ws.mean(), 0.01);
+ assertEquals(reference.variance(), ws.variance(), 0.01);
+ assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+
+ for (int i=0; i<1000; i++) {
+ long x = i + 500;
+ reference.accumulate(x);
+ ws.accumulate(x);
+ }
+ clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+
+ assertEquals(reference.max(), ws.max());
+ assertEquals(reference.min(), ws.min());
+ assertEquals(reference.populationSize(), ws.populationSize());
+ assertEquals(reference.sum(), ws.sum());
+ assertEquals(reference.range(), ws.range());
+ assertEquals(reference.mean(), ws.mean(), 0.01);
+ assertEquals(reference.variance(), ws.variance(), 0.01);
+ assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+
+ for (int i=0; i<1000; i++) {
+ long x = i * i;
+ reference.accumulate(x);
+ ws.accumulate(x);
+ }
+ clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+
+ assertEquals(reference.max(), ws.max());
+ assertEquals(reference.min(), ws.min());
+ assertEquals(reference.populationSize(), ws.populationSize());
+ assertEquals(reference.sum(), ws.sum());
+ assertEquals(reference.range(), ws.range());
+ assertEquals(reference.mean(), ws.mean(), 0.01);
+ assertEquals(reference.variance(), ws.variance(), 0.01);
+ assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01);
+ }
+
+ @Test
+ public void testWindowStats() {
+ FakeClock clock = new FakeClock();
+ WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+ ws.accumulate(1L);
+ assertEmpty(ws);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+
+ assertEquals(1L, ws.max());
+ assertEquals(1L, ws.min());
+ assertEquals(1L, ws.populationSize());
+ assertEquals(1L, ws.sum());
+ assertEquals(1.0, ws.mean(), 0.01);
+ assertEquals(0.0, ws.standardDeviation(), 0.01);
+
+ clock.advance(Amount.of(slices * sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ assertEmpty(ws);
+ }
+
+ @Test
+ public void testCleaningOfExpiredWindows() {
+ FakeClock clock = new FakeClock();
+ WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+ long n = 1000L;
+ for (int i=0; i<n; i++) {
+ ws.accumulate(i);
+ }
+ assertEmpty(ws);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ assertEquals(n, ws.populationSize()); // this window is not empty
+
+ clock.advance(Amount.of(100 * sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ assertEmpty(ws); // this window has been cleaned
+ }
+
+ @Test
+ public void testAddNewValueToFullWS() {
+ FakeClock clock = new FakeClock();
+ WindowedStatistics ws = new WindowedStatistics(window, slices, clock);
+
+ // AAAAA
+ // BBBBB
+ // CCCCC
+ // DDDDD
+ // | | | |
+ //---------------------------------> t
+ // t=0 t=1 t=2 t=3
+
+ // t=0 fill {D}
+ long n = 1000L;
+ for (int i=0; i<n; i++) {
+ ws.accumulate(i);
+ }
+ // read {A,B,C}, which should be empty
+ assertEmpty(ws);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ // t=1, read {B,C,D} which shouldn't be empty
+
+ assertEquals(n - 1L, ws.max());
+ assertEquals(0L, ws.min());
+ assertEquals(n, ws.populationSize());
+ assertEquals(n * (n - 1) / 2, ws.sum());
+ assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ // t=2, read {C,D,A} which shouldn't be empty as well
+
+ assertEquals(n - 1L, ws.max());
+ assertEquals(0L, ws.min());
+ assertEquals(n, ws.populationSize());
+ assertEquals(n * (n - 1) / 2, ws.sum());
+ assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ // t=3, read {D,A,B} which shouldn't be empty as well
+
+ assertEquals(n - 1L, ws.max());
+ assertEquals(0L, ws.min());
+ assertEquals(n, ws.populationSize());
+ assertEquals(n * (n - 1) / 2, ws.sum());
+ assertEquals((n - 1) / 2.0, ws.mean(), 0.01);
+
+ clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS));
+ ws.refresh();
+ // t=4, read {A,B,C} which must be empty (cleaned by the Windowed class)
+ assertEmpty(ws);
+ }
+
+ private void assertEmpty(WindowedStatistics ws) {
+ assertEquals(Long.MIN_VALUE, ws.max());
+ assertEquals(Long.MAX_VALUE, ws.min());
+ assertEquals(0L, ws.populationSize());
+ assertEquals(0L, ws.sum());
+ assertEquals(0.0, ws.mean(), 0.01);
+ assertEquals(0.0, ws.standardDeviation(), 0.01);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
new file mode 100644
index 0000000..17526ea
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java
@@ -0,0 +1,113 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+
+import org.junit.Test;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import junit.framework.Assert;
+
+/**
+ * Test the Windowed abstract class by making a very simple implementation.
+ */
+public class WindowedTest {
+
+ private class WindowedBox extends Windowed<Integer[]> {
+ WindowedBox(Amount<Long, Time > window, int slices, Clock clock) {
+ super(Integer[].class, window, slices,
+ new Supplier<Integer[]>() {
+ @Override public Integer[] get() {
+ Integer[] box = new Integer[1];
+ box[0] = 0;
+ return box;
+ }
+ },
+ new Function<Integer[], Integer[]>() {
+ @Override public Integer[] apply(Integer[] xs) {
+ xs[0] = 0;
+ return xs;
+ }
+ }, clock);
+ }
+
+ void increment() {
+ getCurrent()[0] += 1;
+ }
+
+ int sum() {
+ int s = 0;
+ for (Integer[] box : getTenured()) {
+ s += box[0];
+ }
+ return s;
+ }
+ }
+
+ @Test
+ public void testWindowed() {
+ Amount<Long, Time > window = Amount.of(1L, Time.MINUTES);
+ int slices = 3;
+ Amount<Long, Time > delta = Amount.of(
+ Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS) / 3, Time.NANOSECONDS);
+ FakeClock clock = new FakeClock();
+ WindowedBox win = new WindowedBox(window, slices, clock);
+ // [0][0][0][0]
+ clock.advance(Amount.of(1L, Time.NANOSECONDS));
+
+ win.increment();
+ // [0][0][0][1]
+ Assert.assertEquals(0, win.sum());
+
+ clock.advance(delta);
+ win.increment();
+ win.increment();
+ Assert.assertEquals(1, win.sum());
+ // [0][0][1][2]
+
+ clock.advance(delta);
+ win.increment();
+ win.increment();
+ win.increment();
+ Assert.assertEquals(3, win.sum());
+ // [0][1][2][3]
+
+ clock.advance(delta);
+ win.increment();
+ win.increment();
+ win.increment();
+ win.increment();
+ Assert.assertEquals(6, win.sum());
+ // [1][2][3][4]
+
+ clock.advance(delta);
+ win.increment();
+ win.increment();
+ win.increment();
+ win.increment();
+ win.increment();
+ Assert.assertEquals(9, win.sum());
+ // [2][3][4][5]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
new file mode 100644
index 0000000..56630fa
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java
@@ -0,0 +1,49 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author John Sirois
+ */
+public class TearDownRegistryTest extends TearDownTestCase {
+
+ @Test
+ public void testTearDown() {
+ TearDownRegistry tearDownRegistry = new TearDownRegistry(this);
+ final AtomicBoolean actionExecuted = new AtomicBoolean(false);
+ tearDownRegistry.addAction(new Command() {
+ @Override public void execute() {
+ actionExecuted.set(true);
+ }
+ });
+
+ assertFalse(actionExecuted.get());
+ tearDown();
+ assertTrue(actionExecuted.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
new file mode 100644
index 0000000..8546935
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java
@@ -0,0 +1,63 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.easymock;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author John Sirois
+ */
+public class EasyMockTestTest extends EasyMockTest {
+
+ @Test
+ public void testSimplyParametrizedMock() {
+ final AtomicBoolean ran = new AtomicBoolean(false);
+
+ Runnable runnable = createMock(new Clazz<Runnable>() { });
+ runnable.run();
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override public Void answer() {
+ ran.set(true);
+ return null;
+ }
+ });
+ control.replay();
+
+ runnable.run();
+ assertTrue(ran.get());
+ }
+
+ @Test
+ public void testNestedParametrizedMock() {
+ List<List<String>> list = createMock(new Clazz<List<List<String>>>() { });
+ EasyMock.expect(list.get(0)).andReturn(ImmutableList.of("jake"));
+ control.replay();
+
+ assertEquals(ImmutableList.of("jake"), list.get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
new file mode 100644
index 0000000..07a5aa9
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java
@@ -0,0 +1,53 @@
+package com.twitter.common.testing.easymock;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+import static com.twitter.common.testing.easymock.IterableEquals.eqCollection;
+import static com.twitter.common.testing.easymock.IterableEquals.eqIterable;
+import static com.twitter.common.testing.easymock.IterableEquals.eqList;
+
+public class IterableEqualsTest extends EasyMockTest {
+ private static final List<Integer> TEST = ImmutableList.of(1, 2, 3, 2);
+ private static final String OK = "ok";
+ private Thing thing;
+
+ public interface Thing {
+ String testIterable(Iterable<Integer> input);
+ String testCollection(Collection<Integer> input);
+ String testList(List<Integer> input);
+ }
+
+ @Before
+ public void setUp() {
+ thing = createMock(Thing.class);
+ }
+
+ @Test
+ public void testIterableEquals() {
+ expect(thing.testIterable(eqIterable(TEST))).andReturn(OK);
+ control.replay();
+ thing.testIterable(ImmutableList.of(3, 2, 2, 1));
+ }
+
+ @Test
+ public void testCollectionEquals() {
+ expect(thing.testCollection(eqCollection(TEST))).andReturn(OK);
+ control.replay();
+ thing.testCollection(ImmutableList.of(3, 2, 2, 1));
+ }
+
+ @Test
+ public void testListEquals() {
+ expect(thing.testList(eqList(TEST))).andReturn(OK);
+ control.replay();
+ thing.testList(ImmutableList.of(3, 2, 2, 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
new file mode 100644
index 0000000..847347b
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java
@@ -0,0 +1,233 @@
+// =================================================================================================
+// Copyright 2015 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.junit.rules;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.annotation.Nullable;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.MethodRule;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+
+// SUPPRESS CHECKSTYLE:OFF IllegalThrows
+public class RetryTest {
+
+ public abstract static class RetryTrackingTestBase {
+ private static int tries;
+
+ @BeforeClass
+ public static void resetTries() {
+ tries = 0;
+ }
+
+ enum Result {
+ FAILURE() {
+ @Override void execute() throws Throwable {
+ Assert.fail("Simulated assertion failure.");
+ }
+ },
+ ERROR() {
+ @Override void execute() throws Throwable {
+ throw new IOException("Simulated unexpected error.");
+ }
+ },
+ SUCCESS() {
+ @Override void execute() throws Throwable {
+ Assert.assertTrue("Simulated successful assertion.", true);
+ }
+ };
+
+ abstract void execute() throws Throwable;
+ }
+
+ @Rule public Retry.Rule retry = new Retry.Rule();
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @interface AssertRetries {
+ int expectedTries();
+ int expectedMaxRetries();
+ Result expectedResult();
+ }
+
+ @Rule
+ public MethodRule testRetries = new MethodRule() {
+ @Override
+ public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) {
+ final AssertRetries assertRetries = method.getAnnotation(AssertRetries.class);
+ Assert.assertNotNull(assertRetries);
+ return new Statement() {
+ @Override public void evaluate() throws Throwable {
+ try {
+ statement.evaluate();
+ if (assertRetries.expectedResult() == Result.SUCCESS) {
+ Assert.assertEquals(assertRetries.expectedTries(), tries);
+ } else {
+ Assert.fail("Expected success, found " + assertRetries.expectedResult());
+ }
+ } catch (Retry.Rule.RetriedAssertionError e) {
+ if (assertRetries.expectedResult() == Result.FAILURE) {
+ Assert.assertEquals(assertRetries.expectedTries(), tries);
+ Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries());
+ Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber());
+ } else {
+ Assert.fail("Expected failure, found " + assertRetries.expectedResult());
+ }
+ } catch (Retry.Rule.RetriedException e) {
+ if (assertRetries.expectedResult() == Result.ERROR) {
+ Assert.assertEquals(assertRetries.expectedTries(), tries);
+ Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries());
+ Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber());
+ } else {
+ Assert.fail("Expected error, found " + assertRetries.expectedResult());
+ }
+ }
+ }
+ };
+ }
+ };
+
+ protected void doTest(int successfulTries) throws Throwable {
+ doTest(successfulTries, null);
+ }
+
+ protected void doTest(int successfulTries, @Nullable Result lastResult) throws Throwable {
+ tries++;
+ if (lastResult != null && tries > successfulTries) {
+ lastResult.execute();
+ }
+ }
+ }
+
+ public static class DefaultRetrySuccessTest extends RetryTrackingTestBase {
+ @Test
+ @Retry
+ @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.SUCCESS)
+ public void test() throws Throwable {
+ doTest(2);
+ }
+ }
+
+ public static class DefaultRetryFailFastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.FAILURE)
+ public void test() throws Throwable {
+ doTest(0, Result.FAILURE);
+ }
+ }
+
+ public static class DefaultRetryFailLastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry
+ @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.FAILURE)
+ public void test() throws Throwable {
+ doTest(1, Result.FAILURE);
+ }
+ }
+
+ public static class DefaultRetryErrorFastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.ERROR)
+ public void test() throws Throwable {
+ doTest(0, Result.ERROR);
+ }
+ }
+
+ public static class DefaultRetryErrorLastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry
+ @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.ERROR)
+ public void test() throws Throwable {
+ doTest(1, Result.ERROR);
+ }
+ }
+
+ public static class ZeroRetrySuccessTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 0)
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS)
+ public void test() throws Throwable {
+ doTest(1, Result.SUCCESS);
+ }
+ }
+
+ public static class NegativeRetrySuccessTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = -1)
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS)
+ public void test() throws Throwable {
+ doTest(1, Result.SUCCESS);
+ }
+ }
+
+ public static class PositiveRetrySuccessTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 2)
+ @AssertRetries(expectedTries = 3, expectedMaxRetries = 2, expectedResult = Result.SUCCESS)
+ public void test() throws Throwable {
+ doTest(3, Result.SUCCESS);
+ }
+ }
+
+ public static class PositiveRetryFailFastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 2)
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.FAILURE)
+ public void test() throws Throwable {
+ doTest(0, Result.FAILURE);
+ }
+ }
+
+ public static class PositiveRetryFailLastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 2)
+ @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.FAILURE)
+ public void test() throws Throwable {
+ doTest(1, Result.FAILURE);
+ }
+ }
+
+ public static class PositiveRetryErrorFastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 2)
+ @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.ERROR)
+ public void test() throws Throwable {
+ doTest(0, Result.ERROR);
+ }
+ }
+
+ public static class PositiveRetryErrorLastTest extends RetryTrackingTestBase {
+ @Test
+ @Retry(times = 2)
+ @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.ERROR)
+ public void test() throws Throwable {
+ doTest(1, Result.ERROR);
+ }
+ }
+}
+// SUPPRESS CHECKSTYLE:ON IllegalThrows
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
new file mode 100644
index 0000000..139d90e
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java
@@ -0,0 +1,143 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.thrift.testing.MockTSocket;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftConnectionFactoryTest {
+
+ @Test
+ public void testPreconditions() {
+ try {
+ new ThriftConnectionFactory(null, 1, 1);
+ fail("a non-null host should be required");
+ } catch (NullPointerException e) {
+ // expected
+ }
+
+ try {
+ new ThriftConnectionFactory(" ", 1, 1);
+ fail("a non-blank host should be required");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new ThriftConnectionFactory("localhost", 0, 1);
+ fail("a valid concrete remote port should be required");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new ThriftConnectionFactory("localhost", 65536, 1);
+ fail("a valid port should be required");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ new ThriftConnectionFactory("localhost", 65535, 0);
+ fail("a non-zero value for maxConnections should be required");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMaxConnections() throws TTransportException, IOException {
+ ThriftConnectionFactory thriftConnectionFactory = createConnectionFactory(2);
+
+ Connection<TTransport, InetSocketAddress> connection1 =
+ thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+ assertOpenConnection(connection1);
+
+ Connection<TTransport, InetSocketAddress> connection2 =
+ thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+ assertOpenConnection(connection2);
+ assertThat(connection1, not(sameInstance(connection2)));
+
+ assertNull("Should've reached maximum connections",
+ thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT));
+
+ thriftConnectionFactory.destroy(connection1);
+ assertClosedConnection(connection1);
+
+ Connection<TTransport, InetSocketAddress> connection3 =
+ thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT);
+ assertOpenConnection(connection3);
+ @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
+ Matcher<Connection<TTransport, InetSocketAddress>> matcher =
+ allOf(not(sameInstance(connection1)), not(sameInstance(connection2)));
+ assertThat(connection3, matcher);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInactiveConnectionReturn() {
+ createConnectionFactory(1).destroy(new TTransportConnection(new MockTSocket(),
+ InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullConnectionReturn() {
+ createConnectionFactory(1).destroy(null);
+ }
+
+ private void assertOpenConnection(Connection<TTransport, InetSocketAddress> connection) {
+ assertNotNull(connection);
+ assertTrue(connection.isValid());
+ assertTrue(connection.get().isOpen());
+ }
+
+ private void assertClosedConnection(Connection<TTransport, InetSocketAddress> connection) {
+ assertFalse(connection.isValid());
+ assertFalse(connection.get().isOpen());
+ }
+
+ private ThriftConnectionFactory createConnectionFactory(int maxConnections) {
+ return new ThriftConnectionFactory("foo", 1234, maxConnections) {
+ @Override TTransport createTransport(int timeoutMillis) throws TTransportException {
+ TTransport transport = new MockTSocket();
+ transport.open();
+ return transport;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
new file mode 100644
index 0000000..d1fcf3d
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java
@@ -0,0 +1,245 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.async.TAsyncClient;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.thrift.ThriftFactoryTest.GoodService.AsyncIface;
+import com.twitter.thrift.ServiceInstance;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftFactoryTest extends TearDownTestCase {
+
+ private static final Logger LOG = Logger.getLogger(ThriftFactoryTest.class.getName());
+ private IMocksControl control;
+
+ static class GoodService {
+ public interface Iface {
+ String doWork() throws TResourceExhaustedException;
+ }
+
+ public interface AsyncIface {
+ void doWork(AsyncMethodCallback<String> callback);
+ }
+
+ public static final String DONE = "done";
+
+ public static class Client implements Iface {
+ public Client(TProtocol protocol) {
+ assertNotNull(protocol);
+ }
+
+ @Override public String doWork() throws TResourceExhaustedException {
+ return DONE;
+ }
+ }
+
+ public static class AsyncClient extends TAsyncClient implements AsyncIface {
+ public AsyncClient(TProtocolFactory factory, TAsyncClientManager manager,
+ TNonblockingTransport transport) {
+ super(factory, manager, transport);
+ assertNotNull(factory);
+ assertNotNull(manager);
+ assertNotNull(transport);
+ }
+
+ @Override public void doWork(AsyncMethodCallback<String> callback) {
+ callback.onComplete(DONE);
+ }
+ }
+ }
+
+ static class BadService {
+ public interface Iface {
+ void doWork();
+ }
+ public interface AsyncIface {
+ void doWork(AsyncMethodCallback<Void> callback);
+ }
+
+ public static class Client implements Iface {
+ @Override public void doWork() {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+ private ImmutableSet<InetSocketAddress> endpoints;
+
+ @Before
+ public void setUp() throws Exception {
+ control = EasyMock.createControl();
+ endpoints = ImmutableSet.of(new InetSocketAddress(5555));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullServiceInterface() {
+ ThriftFactory.create(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadServiceInterface() {
+ ThriftFactory.create(GoodService.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadServiceImpl() throws ThriftFactory.ThriftFactoryException {
+ ThriftFactory.<BadService.Iface>create(BadService.Iface.class)
+ .build(endpoints);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadAsyncServiceImpl() throws ThriftFactory.ThriftFactoryException {
+ ThriftFactory.<BadService.AsyncIface>create(BadService.AsyncIface.class)
+ .useFramedTransport(true)
+ .buildAsync(endpoints);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoBackends() {
+ ThriftFactory.create(GoodService.Iface.class)
+ .build(ImmutableSet.<InetSocketAddress>of());
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ final AtomicReference<Socket> clientConnection = new AtomicReference<Socket>();
+ final CountDownLatch connected = new CountDownLatch(1);
+ final ServerSocket server = new ServerSocket(0);
+ Thread service = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ clientConnection.set(server.accept());
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e);
+ } finally {
+ connected.countDown();
+ }
+ }
+ });
+ service.setDaemon(true);
+ service.start();
+
+ try {
+ final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class)
+ .withMaxConnectionsPerEndpoint(1)
+ .build(ImmutableSet.of(new InetSocketAddress(server.getLocalPort())));
+ addTearDown(new TearDown() {
+ @Override public void tearDown() {
+ thrift.close();
+ }
+ });
+
+ GoodService.Iface client = thrift.create();
+
+ assertEquals(GoodService.DONE, client.doWork());
+ } finally {
+ connected.await();
+ server.close();
+ }
+
+ Socket socket = clientConnection.get();
+ assertNotNull(socket);
+ socket.close();
+ }
+
+ @Test(expected = TResourceExhaustedException.class)
+ public void testCreateEmpty() throws Exception {
+ @SuppressWarnings("unchecked")
+ DynamicHostSet<ServiceInstance> emptyHostSet = control.createMock(DynamicHostSet.class);
+ final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class)
+ .withMaxConnectionsPerEndpoint(1)
+ .build(emptyHostSet);
+ addTearDown(new TearDown() {
+ @Override public void tearDown() {
+ thrift.close();
+ }
+ });
+ GoodService.Iface client = thrift.create();
+
+ // This should throw a TResourceExhaustedException
+ client.doWork();
+ }
+
+ @Test
+ public void testCreateAsync()
+ throws IOException, InterruptedException, ThriftFactory.ThriftFactoryException {
+ final String responseHolder[] = new String[] {null};
+ final CountDownLatch done = new CountDownLatch(1);
+ AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {
+ @Override
+ public void onComplete(String response) {
+ responseHolder[0] = response;
+ done.countDown();
+ }
+
+ @Override
+ public void onError(Exception throwable) {
+ responseHolder[0] = throwable.toString();
+ done.countDown();
+ }
+ };
+
+ final Thrift<AsyncIface> thrift = ThriftFactory.create(GoodService.AsyncIface.class)
+ .withMaxConnectionsPerEndpoint(1)
+ .useFramedTransport(true)
+ .buildAsync(ImmutableSet.of(new InetSocketAddress(1234)));
+ addTearDown(new TearDown() {
+ @Override public void tearDown() {
+ thrift.close();
+ }
+ });
+ GoodService.AsyncIface client = thrift.builder()
+ .blocking()
+ .create();
+
+ client.doWork(callback);
+ assertTrue("wasn't called back in time, callback got " + responseHolder[0],
+ done.await(5000, TimeUnit.MILLISECONDS));
+ assertEquals(GoodService.DONE, responseHolder[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
new file mode 100644
index 0000000..eea6b5b
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java
@@ -0,0 +1,934 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Function;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.net.loadbalancing.LoadBalancer;
+import com.twitter.common.net.loadbalancing.RequestTracker;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.net.pool.ResourceExhaustedException;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.thrift.callers.RetryingCaller;
+import com.twitter.common.thrift.testing.MockTSocket;
+import com.twitter.common.util.concurrent.ForwardingExecutorService;
+
+import static org.easymock.EasyMock.and;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ThriftTest {
+
+ private static final Amount<Long, Time> ASYNC_CONNECT_TIMEOUT = Amount.of(1L, Time.SECONDS);
+
+ public static class NotFoundException extends Exception {}
+
+ public interface TestService {
+ int calculateMass(String profileName) throws NotFoundException, TException;
+ }
+
+ public interface TestServiceAsync {
+ void calculateMass(String profileName, AsyncMethodCallback callback) throws TException;
+ }
+
+ private IMocksControl control;
+ private ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
+ private Function<TTransport, TestService> clientFactory;
+ private Function<TTransport, TestServiceAsync> asyncClientFactory;
+ private RequestTracker<InetSocketAddress> requestTracker;
+
+ private AsyncMethodCallback<Integer> callback;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() throws Exception {
+ control = EasyMock.createControl();
+
+ this.connectionPool = control.createMock(ObjectPool.class);
+ this.clientFactory = control.createMock(Function.class);
+ this.asyncClientFactory = control.createMock(Function.class);
+ this.requestTracker = control.createMock(LoadBalancer.class);
+
+ this.callback = control.createMock(AsyncMethodCallback.class);
+ }
+
+ @After
+ public void after() {
+ Stats.flush();
+ }
+
+ @Test
+ public void testDoCallNoDeadline() throws Exception {
+ TestService testService = expectServiceCall(false);
+ expect(testService.calculateMass("jake")).andReturn(42);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ int userMass = thrift.builder().blocking().create().calculateMass("jake");
+
+ assertEquals(42, userMass);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testDoCallAsync() throws Exception {
+ // Capture the callback that Thift has wrapped around our callback.
+ Capture<AsyncMethodCallback<Integer>> callbackCapture =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ // Verifies that our callback was called.
+ callback.onComplete(42);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ // Mimicks the async response from the server.
+ callbackCapture.getValue().onComplete(42);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testDoCallServiceException() throws Exception {
+ TestService testService = expectServiceCall(true);
+ NotFoundException notFoundException = new NotFoundException();
+ expect(testService.calculateMass("jake")).andThrow(notFoundException);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ try {
+ thrift.builder().blocking().create().calculateMass("jake");
+ fail("Expected service custom exception to bubble unmodified");
+ } catch (NotFoundException e) {
+ assertSame(notFoundException, e);
+ }
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testDoCallAsyncServiceException() throws Exception {
+ NotFoundException notFoundException = new NotFoundException();
+
+ // Capture the callback that Thift has wrapped around our callback.
+ Capture<AsyncMethodCallback<Integer>> callbackCapture =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // Verifies that our callback was called.
+ callback.onError(notFoundException);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ // Mimicks the async response from the server.
+ callbackCapture.getValue().onError(notFoundException);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testDoCallThriftException() throws Exception {
+ Capture<TTransport> transportCapture = new Capture<TTransport>();
+ TestService testService = expectThriftError(transportCapture);
+ TTransportException tException = new TTransportException();
+ expect(testService.calculateMass("jake")).andThrow(tException);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ try {
+ thrift.builder().blocking().create().calculateMass("jake");
+ fail("Expected thrift exception to bubble unmodified");
+ } catch (TException e) {
+ assertSame(tException, e);
+ }
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ assertTrue(transportCapture.hasCaptured());
+ assertFalse("Expected the transport to be forcibly closed when a thrift error is encountered",
+ transportCapture.getValue().isOpen());
+
+ control.verify();
+ }
+
+ @Test
+ public void doCallAsyncThriftException() throws Exception {
+ TTransportException tException = new TTransportException();
+
+ expectAsyncServiceCall(true).calculateMass(eq("jake"), (AsyncMethodCallback) anyObject());
+ expectLastCall().andThrow(tException);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ callback.onError(tException);
+
+ control.replay();
+
+ thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDisallowsAsyncWithDeadline() {
+ Config config = Config.builder()
+ .withRequestTimeout(Amount.of(1L, Time.SECONDS))
+ .create();
+
+ new Thrift<TestServiceAsync>(config, connectionPool, requestTracker,
+ "foo", TestServiceAsync.class, asyncClientFactory, true, false).create();
+ }
+
+ @Test
+ public void testDoCallDeadlineMet() throws Exception {
+ TestService testService = expectServiceCall(false);
+ expect(testService.calculateMass("jake")).andReturn(42);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Thrift<TestService> thrift = createThrift(executorService);
+
+ control.replay();
+
+ int userMass = thrift.builder().withRequestTimeout(Amount.of(1L, Time.DAYS)).create()
+ .calculateMass("jake");
+
+ assertEquals(42, userMass);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ @Ignore("Flaky: https://trac.twitter.com/twttr/ticket/11474")
+ public void testDoCallDeadlineExpired() throws Exception {
+ TestService testService = expectServiceCall(true);
+
+ // Setup a way to verify the callable was cancelled by Thrift when timeout elapsed
+ final CountDownLatch remoteCallComplete = new CountDownLatch(1);
+ final CountDownLatch remoteCallStarted = new CountDownLatch(1);
+ final Command verifyCancelled = control.createMock(Command.class);
+ verifyCancelled.execute();
+ final Object block = new Object();
+ expect(testService.calculateMass("jake")).andAnswer(new IAnswer<Integer>() {
+ @Override public Integer answer() throws TException {
+ try {
+ synchronized (block) {
+ remoteCallStarted.countDown();
+ block.wait();
+ }
+ fail("Expected late work to be cancelled and interrupted");
+ } catch (InterruptedException e) {
+ verifyCancelled.execute();
+ } finally {
+ remoteCallComplete.countDown();
+ }
+ throw new TTransportException();
+ }
+ });
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.TIMEOUT), anyLong());
+
+ ExecutorService executorService =
+ new ForwardingExecutorService<ExecutorService>(Executors.newSingleThreadExecutor()) {
+ @Override public <T> Future<T> submit(Callable<T> task) {
+ Future<T> future = super.submit(task);
+
+ // make sure the task is started so we can verify it gets cancelled
+ try {
+ remoteCallStarted.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return future;
+ }
+ };
+ Thrift<TestService> thrift = createThrift(executorService);
+
+ control.replay();
+
+ try {
+ thrift.builder().withRequestTimeout(Amount.of(1L, Time.NANOSECONDS)).create()
+ .calculateMass("jake");
+ fail("Expected a timeout");
+ } catch (TTimeoutException e) {
+ // expected
+ } finally {
+ remoteCallComplete.await();
+ }
+
+ assertRequestsTotal(thrift, 0);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 1);
+
+ control.verify();
+ }
+
+ @Test
+ public void testRetriesNoProblems() throws Exception {
+ expect(expectServiceCall(false).calculateMass("jake")).andReturn(42);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService = thrift.builder().blocking().withRetries(1).create();
+
+ assertEquals(42, testService.calculateMass("jake"));
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncRetriesNoProblems() throws Exception {
+ // Capture the callback that Thift has wrapped around our callback.
+ Capture<AsyncMethodCallback<Integer>> callbackCapture =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ // Verifies that our callback was called.
+ callback.onComplete(42);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ // Mimicks the async response from the server.
+ callbackCapture.getValue().onComplete(42);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testRetriesRecover() throws Exception {
+ // 1st call
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 1st retry recovers
+ expect(expectServiceCall(false).calculateMass("jake")).andReturn(42);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService = thrift.builder().blocking().withRetries(1).create();
+
+ assertEquals(42, testService.calculateMass("jake"));
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncRetriesRecover() throws Exception {
+ // Capture the callback that Thift has wrapped around our callback.
+ Capture<AsyncMethodCallback<Integer>> callbackCapture =
+ new Capture<AsyncMethodCallback<Integer>>();
+
+ // 1st call
+ expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture));
+ expectLastCall().andThrow(new TTransportException());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 1st retry recovers
+ expectAsyncServiceRetry(false).calculateMass(eq("jake"), capture(callbackCapture));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong());
+
+ // Verifies that our callback was called.
+ callback.onComplete(42);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ // Mimicks the async response from the server.
+ callbackCapture.getValue().onComplete(42);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 0);
+ assertReconnectsTotal(thrift, 0);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testRetriesFailure() throws Exception {
+ // 1st call
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 1st retry
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 2nd retry
+ TTransportException finalRetryException = new TTransportException();
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(finalRetryException);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService = thrift.builder().blocking().withRetries(2).create();
+
+ try {
+ testService.calculateMass("jake");
+ fail("Expected an exception to be thrown since all retires failed");
+ } catch (TException e) {
+ assertSame(finalRetryException, e);
+ }
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncRetriesFailure() throws Exception {
+ // 1st call
+ Capture<AsyncMethodCallback<Integer>> callbackCapture1 =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 1st retry
+ Capture<AsyncMethodCallback<Integer>> callbackCapture2 =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // 2nd retry
+ Capture<AsyncMethodCallback<Integer>> callbackCapture3 =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture3));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // Verifies that our callback was called.
+ TTransportException returnedException = new TTransportException();
+ callback.onError(returnedException);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ thrift.builder().withRetries(2).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ callbackCapture1.getValue().onError(new TTransportException());
+ callbackCapture2.getValue().onError(new IOException());
+ callbackCapture3.getValue().onError(returnedException);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testRetrySelection() throws Exception {
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(new NotFoundException());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // verify subclasses pass the retry filter
+ class HopelesslyLost extends NotFoundException {}
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(new HopelesslyLost());
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ TTransportException nonRetryableException = new TTransportException();
+ expect(expectServiceCall(true).calculateMass("jake")).andThrow(nonRetryableException);
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService =
+ thrift.builder().blocking().withRetries(2).retryOn(NotFoundException.class).create();
+
+ try {
+ testService.calculateMass("jake");
+ fail("Expected n exception to be thrown since all retires failed");
+ } catch (TException e) {
+ assertSame(nonRetryableException, e);
+ }
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncRetrySelection() throws Exception {
+ // verify subclasses pass the retry filter
+ class HopelesslyLost extends NotFoundException {}
+ Capture<AsyncMethodCallback<Integer>> callbackCapture1 =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ Capture<AsyncMethodCallback<Integer>> callbackCapture2 =
+ new Capture<AsyncMethodCallback<Integer>>();
+ expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2));
+ requestTracker.requestResult(
+ (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong());
+
+ // Verifies that our callback was called.
+ TTransportException nonRetryableException = new TTransportException();
+ callback.onError(nonRetryableException);
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestServiceAsync testService = thrift.builder()
+ .withRetries(2)
+ .retryOn(NotFoundException.class)
+ .withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create();
+
+ testService.calculateMass("jake", callback);
+ callbackCapture1.getValue().onError(new HopelesslyLost());
+ callbackCapture2.getValue().onError(nonRetryableException);
+
+ assertRequestsTotal(thrift, 1);
+ assertErrorsTotal(thrift, 1);
+ assertReconnectsTotal(thrift, 1);
+ assertTimeoutsTotal(thrift, 0);
+
+ control.verify();
+ }
+
+ @Test
+ public void testResourceExhausted() throws Exception {
+ expectConnectionPoolResourceExhausted(Config.DEFAULT_CONNECT_TIMEOUT);
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService = thrift.builder().blocking().create();
+
+ try {
+ testService.calculateMass("jake");
+ fail("Expected a TResourceExhaustedException.");
+ } catch (TResourceExhaustedException e) {
+ // Expected
+ }
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncResourceExhausted() throws Exception {
+ expectConnectionPoolResourceExhausted(ASYNC_CONNECT_TIMEOUT);
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ callback.onError(and(anyObject(), isA(TResourceExhaustedException.class)));
+
+ control.replay();
+
+ TestServiceAsync testService = thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT)
+ .create();
+
+ testService.calculateMass("jake", callback);
+
+ control.verify();
+ }
+
+ @Test
+ public void testAsyncDoesNotRetryResourceExhausted() throws Exception {
+ expect(connectionPool.get(ASYNC_CONNECT_TIMEOUT)).andThrow(
+ new ResourceExhaustedException("first"));
+
+ Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService());
+
+ callback.onError(and(anyObject(), isA(TResourceExhaustedException.class)));
+
+ control.replay();
+
+ thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create()
+ .calculateMass("jake", callback);
+
+ control.verify();
+ }
+
+ @Test
+ public void testConnectionPoolTimeout() throws Exception {
+ expectConnectionPoolTimeout(Config.DEFAULT_CONNECT_TIMEOUT);
+ Thrift<TestService> thrift = createThrift(expectUnusedExecutorService());
+
+ control.replay();
+
+ TestService testService =
+ thrift.builder().blocking().create();
+
+ try {
+ testService.calculateMass("jake");
+ fail("Expected a TTimeoutException.");
+ } catch (TTimeoutException e) {
+ // Expected
+ }
+
+ control.verify();
+ }
+
+ @Test
+ public void testDoCallDeadlineNoThreads() throws Exception {
+ control.replay();
+
+ ExecutorService executorService =
+ new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
+
+ Thrift<TestService> thrift = createThrift(executorService);
+
+ final TestService service =
+ thrift.builder().noRetries().withRequestTimeout(Amount.of(1L, Time.SECONDS)).create();
+
+ final CountDownLatch remoteCallComplete = new CountDownLatch(1);
+ final CountDownLatch remoteCallStarted = new CountDownLatch(1);
+
+ Future<Integer> result = executorService.submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ remoteCallStarted.countDown();
+ remoteCallComplete.await();
+ return service.calculateMass("jake");
+ }
+ });
+
+ remoteCallStarted.await();
+ try {
+ service.calculateMass("jake");
+ fail("Expected no available threads to trigger resource exhausted");
+ } catch (TResourceExhaustedException e) {
+ // expected
+ } finally {
+ remoteCallComplete.countDown();
+ }
+
+ try {
+ result.get();
+ fail("Expected no available threads to trigger resource exhausted");
+ } catch (ExecutionException e) {
+ assertEquals(TResourceExhaustedException.class, e.getCause().getClass());
+ }
+
+ control.verify();
+ }
+
+ private ExecutorService expectUnusedExecutorService() {
+ return control.createMock(ExecutorService.class);
+ }
+
+ private static final String STAT_REQUESTS = "requests_events";
+ private static final String STAT_ERRORS = "errors";
+ private static final String STAT_RECONNECTS = "reconnects";
+ private static final String STAT_TIMEOUTS = "timeouts";
+
+ private void assertRequestsTotal(Thrift<?> thrift, int total) {
+ assertRequestStatValue(STAT_REQUESTS, total);
+ }
+
+ private void assertErrorsTotal(Thrift<?> thrift, int total) {
+ assertRequestStatValue(STAT_ERRORS, total);
+ }
+
+ private void assertReconnectsTotal(Thrift<?> thrift, int total) {
+ assertRequestStatValue(STAT_RECONNECTS, total);
+ }
+
+ private void assertTimeoutsTotal(Thrift<?> thrift, int total) {
+ assertRequestStatValue(STAT_TIMEOUTS, total);
+ }
+
+ private void assertRequestStatValue(String statName, long expectedValue) {
+
+ Stat<Long> var = Stats.getVariable("foo_calculateMass_" + statName);
+
+ assertNotNull(var);
+ assertEquals(expectedValue, (long) var.read());
+ }
+
+ private Thrift<TestService> createThrift(ExecutorService executorService) {
+ return new Thrift<TestService>(executorService, connectionPool, requestTracker, "foo",
+ TestService.class, clientFactory, false, false);
+ }
+
+ private Thrift<TestServiceAsync> createAsyncThrift(ExecutorService executorService) {
+ return new Thrift<TestServiceAsync>(executorService, connectionPool, requestTracker, "foo",
+ TestServiceAsync.class, asyncClientFactory, true, false);
+ }
+
+ private TestService expectServiceCall(boolean withFailure)
+ throws ResourceExhaustedException, TimeoutException {
+ Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet();
+ return expectServiceCall(connection, withFailure);
+ }
+
+ private TestServiceAsync expectAsyncServiceCall(boolean withFailure)
+ throws ResourceExhaustedException, TimeoutException {
+ return expectAsyncServiceCall(expectConnectionPoolGet(ASYNC_CONNECT_TIMEOUT), withFailure);
+ }
+
+ private TestServiceAsync expectAsyncServiceRetry(boolean withFailure)
+ throws ResourceExhaustedException, TimeoutException {
+ return expectAsyncServiceCall(
+ expectConnectionPoolGet(RetryingCaller.NONBLOCKING_TIMEOUT), withFailure);
+ }
+
+ private TestService expectThriftError(Capture<TTransport> transportCapture)
+ throws ResourceExhaustedException, TimeoutException {
+ Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet();
+ return expectServiceCall(connection, transportCapture, true);
+ }
+
+ private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet()
+ throws ResourceExhaustedException, TimeoutException {
+ Connection<TTransport, InetSocketAddress> connection = createConnection();
+ expect(connectionPool.get(Config.DEFAULT_CONNECT_TIMEOUT)).andReturn(connection);
+ return connection;
+ }
+
+ private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet(
+ Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
+ Connection<TTransport, InetSocketAddress> connection = createConnection();
+ expect(connectionPool.get(timeout)).andReturn(connection);
+ return connection;
+ }
+
+ private void expectConnectionPoolResourceExhausted(Amount<Long, Time> timeout)
+ throws ResourceExhaustedException, TimeoutException {
+ expect(connectionPool.get(timeout)).andThrow(new ResourceExhaustedException(""));
+ }
+
+ private void expectConnectionPoolTimeout(Amount<Long, Time> timeout)
+ throws ResourceExhaustedException, TimeoutException {
+ expect(connectionPool.get(timeout)).andThrow(new TimeoutException());
+ }
+
+ private Connection<TTransport, InetSocketAddress> createConnection() {
+ return new TTransportConnection(new MockTSocket(),
+ InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT));
+ }
+
+ private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection,
+ boolean withFailure) {
+ return expectServiceCall(connection, null, withFailure);
+ }
+
+ private TestServiceAsync expectAsyncServiceCall(
+ Connection<TTransport, InetSocketAddress> connection, boolean withFailure) {
+ return expectAsyncServiceCall(connection, null, withFailure);
+ }
+
+ private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection,
+ Capture<TTransport> transportCapture, boolean withFailure) {
+
+ TestService testService = control.createMock(TestService.class);
+ if (connection != null) {
+ IExpectationSetters<TestService> expectApply = transportCapture == null
+ ? expect(clientFactory.apply(EasyMock.isA(TTransport.class)))
+ : expect(clientFactory.apply(EasyMock.capture(transportCapture)));
+ expectApply.andReturn(testService);
+
+ if (withFailure) {
+ connectionPool.remove(connection);
+ } else {
+ connectionPool.release(connection);
+ }
+ }
+ return testService;
+ }
+
+ private TestServiceAsync expectAsyncServiceCall(
+ Connection<TTransport, InetSocketAddress> connection,
+ Capture<TTransport> transportCapture, boolean withFailure) {
+
+ TestServiceAsync testService = control.createMock(TestServiceAsync.class);
+ if (connection != null) {
+ IExpectationSetters<TestServiceAsync> expectApply = transportCapture == null
+ ? expect(asyncClientFactory.apply(EasyMock.isA(TTransport.class)))
+ : expect(asyncClientFactory.apply(EasyMock.capture(transportCapture)));
+ expectApply.andReturn(testService);
+
+ if (withFailure) {
+ connectionPool.remove(connection);
+ } else {
+ connectionPool.release(connection);
+ }
+ }
+ return testService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
new file mode 100644
index 0000000..cf55afe
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java
@@ -0,0 +1,59 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift.callers;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import org.junit.Before;
+
+import java.lang.reflect.Method;
+
+import static org.easymock.EasyMock.expect;
+
+/**
+ * Test framework for testing callers.
+ *
+ * @author William Farner
+ */
+public abstract class AbstractCallerTest extends EasyMockTest {
+ protected final Amount<Long, Time> CONNECT_TIMEOUT = Amount.of(1L, Time.HOURS);
+
+ protected Caller caller;
+
+ protected Method methodA;
+ protected Object[] argsA;
+
+ @Before
+ public final void callerSetUp() throws Exception {
+ caller = createMock(Caller.class);
+ methodA = Object.class.getMethod("toString");
+ argsA = new Object[] {};
+ }
+
+ protected String call(Caller caller) throws Throwable {
+ return (String) caller.call(methodA, argsA, null, CONNECT_TIMEOUT);
+ }
+
+ protected void expectCall(String returnValue) throws Throwable {
+ expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andReturn(returnValue);
+ }
+
+ protected void expectCall(Throwable thrown) throws Throwable {
+ expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andThrow(thrown);
+ }
+}