You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/09 20:13:53 UTC
[1/2] incubator-beam git commit: Revert Backoff Classes to Fix Worker
Dependency Error
Repository: incubator-beam
Updated Branches:
refs/heads/master e9326c8b1 -> 82ebfd487
Revert Backoff Classes to Fix Worker Dependency Error
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51c73a29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51c73a29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51c73a29
Branch: refs/heads/master
Commit: 51c73a29f53f0be621bcd42c2b2845cff0ef960d
Parents: e9326c8
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Fri Sep 9 10:25:12 2016 -0700
Committer: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Committed: Fri Sep 9 10:25:12 2016 -0700
----------------------------------------------------------------------
...AttemptAndTimeBoundedExponentialBackOff.java | 173 +++++++++++++++
.../util/AttemptBoundedExponentialBackOff.java | 86 ++++++++
...mptAndTimeBoundedExponentialBackOffTest.java | 213 +++++++++++++++++++
.../AttemptBoundedExponentialBackOffTest.java | 85 ++++++++
4 files changed, 557 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
new file mode 100644
index 0000000..3fe2918
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.NanoClock;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
+ * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
+ * unless the time interval has expired since the object was created. At this point, it will always
+ * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
+ * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
+ * constructor.
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
+ private long endTimeMillis;
+ private long maximumTotalWaitTimeMillis;
+ private ResetPolicy resetPolicy;
+ private final NanoClock nanoClock;
+ // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns. Here, we choose 2^53 ns as
+ // a smaller but still huge limit.
+ private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
+
+ /**
+ * A ResetPolicy controls the behavior of this BackOff when reset() is called. By default, both
+ * the number of attempts and the time bound for the BackOff are reset, but an alternative
+ * ResetPolicy may be set to only reset one of these two.
+ */
+ public static enum ResetPolicy {
+ ALL,
+ ATTEMPTS,
+ TIMER
+ }
+
+ /**
+ * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+ *
+ * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+ * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+ * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+ * allow more attempts in milliseconds.
+ */
+ public AttemptAndTimeBoundedExponentialBackOff(
+ int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
+ this(
+ maximumNumberOfAttempts,
+ initialIntervalMillis,
+ maximumTotalWaitTimeMillis,
+ ResetPolicy.ALL,
+ NanoClock.SYSTEM);
+ }
+
+ /**
+ * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+ *
+ * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+ * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+ * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+ * allow more attempts in milliseconds.
+ * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
+ * to being reset.
+ */
+ public AttemptAndTimeBoundedExponentialBackOff(
+ int maximumNumberOfAttempts,
+ long initialIntervalMillis,
+ long maximumTotalWaitTimeMillis,
+ ResetPolicy resetPolicy) {
+ this(
+ maximumNumberOfAttempts,
+ initialIntervalMillis,
+ maximumTotalWaitTimeMillis,
+ resetPolicy,
+ NanoClock.SYSTEM);
+ }
+
+ /**
+ * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+ *
+ * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+ * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+ * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+ * allow more attempts in milliseconds.
+ * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
+ * to being reset.
+ * @param nanoClock clock used to measure the time that has passed.
+ */
+ public AttemptAndTimeBoundedExponentialBackOff(
+ int maximumNumberOfAttempts,
+ long initialIntervalMillis,
+ long maximumTotalWaitTimeMillis,
+ ResetPolicy resetPolicy,
+ NanoClock nanoClock) {
+ super(maximumNumberOfAttempts, initialIntervalMillis);
+ checkArgument(
+ maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
+ checkArgument(
+ maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
+ "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
+ checkArgument(resetPolicy != null, "resetPolicy may not be null");
+ checkArgument(nanoClock != null, "nanoClock may not be null");
+ this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
+ this.resetPolicy = resetPolicy;
+ this.nanoClock = nanoClock;
+ // Set the end time for this BackOff. Note that we cannot simply call reset() here since the
+ // resetPolicy may not be set to reset the time bound.
+ endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
+ }
+
+ @Override
+ @SuppressFBWarnings(value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
+ justification = "Explicitly handled in implementation.")
+ public void reset() {
+ // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
+ // set. In this case, we call the parent class's reset() method and return.
+ if (resetPolicy == null) {
+ super.reset();
+ return;
+ }
+ // Reset the number of attempts.
+ if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
+ super.reset();
+ }
+ // Reset the time bound.
+ if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
+ endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
+ }
+ }
+
+ public void setEndtimeMillis(long endTimeMillis) {
+ this.endTimeMillis = endTimeMillis;
+ }
+
+ @Override
+ public long nextBackOffMillis() {
+ if (atMaxAttempts()) {
+ return BackOff.STOP;
+ }
+ long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
+ return (backoff > 0 ? backoff : BackOff.STOP);
+ }
+
+ private long getTimeMillis() {
+ return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
+ }
+
+ @Override
+ public boolean atMaxAttempts() {
+ return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
new file mode 100644
index 0000000..8f6f854
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+
+
+/**
+ * Implementation of {@link BackOff} that increases the back off period for each retry attempt
+ * using a randomization function that grows exponentially.
+ *
+ * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
+ * For 10 tries the sequence will be (values in seconds):
+ *
+ * <pre>
+ * retry# retry_interval randomized_interval
+ * 1 0.5 [0.25, 0.75]
+ * 2 0.75 [0.375, 1.125]
+ * 3 1.125 [0.562, 1.687]
+ * 4 1.687 [0.8435, 2.53]
+ * 5 2.53 [1.265, 3.795]
+ * 6 3.795 [1.897, 5.692]
+ * 7 5.692 [2.846, 8.538]
+ * 8 8.538 [4.269, 12.807]
+ * 9 12.807 [6.403, 19.210]
+ * 10 {@link BackOff#STOP}
+ * </pre>
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class AttemptBoundedExponentialBackOff implements BackOff {
+ public static final double DEFAULT_MULTIPLIER = 1.5;
+ public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+ private final int maximumNumberOfAttempts;
+ private final long initialIntervalMillis;
+ private int currentAttempt;
+
+ public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
+ checkArgument(maximumNumberOfAttempts > 0,
+ "Maximum number of attempts must be greater than zero.");
+ checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
+ this.maximumNumberOfAttempts = maximumNumberOfAttempts;
+ this.initialIntervalMillis = initialIntervalMillis;
+ reset();
+ }
+
+ @Override
+ public void reset() {
+ currentAttempt = 1;
+ }
+
+ @Override
+ public long nextBackOffMillis() {
+ if (currentAttempt >= maximumNumberOfAttempts) {
+ return BackOff.STOP;
+ }
+ double currentIntervalMillis = initialIntervalMillis
+ * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
+ double randomOffset = (Math.random() * 2 - 1)
+ * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+ currentAttempt += 1;
+ return Math.round(currentIntervalMillis + randomOffset);
+ }
+
+ public boolean atMaxAttempts() {
+ return currentAttempt >= maximumNumberOfAttempts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
new file mode 100644
index 0000000..f1d7371
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.BackOff;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link AttemptAndTimeBoundedExponentialBackOff}. */
+@RunWith(JUnit4.class)
+@SuppressWarnings("deprecation") // test of deprecated class
+public class AttemptAndTimeBoundedExponentialBackOffTest {
+ @Rule public ExpectedException exception = ExpectedException.none();
+ @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+ @Test
+ public void testUsingInvalidInitialInterval() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Initial interval must be greater than zero.");
+ new AttemptAndTimeBoundedExponentialBackOff(10, 0L, 1000L);
+ }
+
+ @Test
+ public void testUsingInvalidTimeInterval() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Maximum total wait time must be greater than zero.");
+ new AttemptAndTimeBoundedExponentialBackOff(10, 2L, 0L);
+ }
+
+ @Test
+ public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Maximum number of attempts must be greater than zero.");
+ new AttemptAndTimeBoundedExponentialBackOff(-1, 10L, 1000L);
+ }
+
+ @Test
+ public void testThatFixedNumberOfAttemptsExits() throws Exception {
+ BackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3,
+ 500L,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testThatResettingAllowsReuse() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3,
+ 500,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+
+ backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 30,
+ 500,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ fastClock.sleep(2000L);
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ }
+
+ @Test
+ public void testThatResettingAttemptsAllowsReuse() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3,
+ 500,
+ 1000,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testThatResettingAttemptsDoesNotAllowsReuse() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 30,
+ 500,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ fastClock.sleep(2000L);
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testThatResettingTimerAllowsReuse() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 30,
+ 500,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ fastClock.sleep(2000L);
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(561L), lessThan(1688L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(843L), lessThan(2531L)));
+ }
+
+ @Test
+ public void testThatResettingTimerDoesNotAllowReuse() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3,
+ 500,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
+ fastClock);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testTimeBound() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3, 500L, 5L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
+ assertEquals(backOff.nextBackOffMillis(), 5L);
+ }
+
+ @Test
+ public void testAtMaxAttempts() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3,
+ 500L,
+ 1000L,
+ AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+ fastClock);
+ assertFalse(backOff.atMaxAttempts());
+ backOff.nextBackOffMillis();
+ assertFalse(backOff.atMaxAttempts());
+ backOff.nextBackOffMillis();
+ assertTrue(backOff.atMaxAttempts());
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testAtMaxTime() throws Exception {
+ AttemptBoundedExponentialBackOff backOff =
+ new AttemptAndTimeBoundedExponentialBackOff(
+ 3, 500L, 1L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
+ fastClock.sleep(2);
+ assertTrue(backOff.atMaxAttempts());
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
new file mode 100644
index 0000000..44e435e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.BackOff;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link AttemptBoundedExponentialBackOff}. */
+@RunWith(JUnit4.class)
+@SuppressWarnings("deprecation") // test of deprecated class
+public class AttemptBoundedExponentialBackOffTest {
+ @Rule public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testUsingInvalidInitialInterval() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Initial interval must be greater than zero.");
+ new AttemptBoundedExponentialBackOff(10, 0L);
+ }
+
+ @Test
+ public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Maximum number of attempts must be greater than zero.");
+ new AttemptBoundedExponentialBackOff(-1, 10L);
+ }
+
+ @Test
+ public void testThatFixedNumberOfAttemptsExits() throws Exception {
+ BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testThatResettingAllowsReuse() throws Exception {
+ BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ backOff.reset();
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+ assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+
+ @Test
+ public void testAtMaxAttempts() throws Exception {
+ AttemptBoundedExponentialBackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+ assertFalse(backOff.atMaxAttempts());
+ backOff.nextBackOffMillis();
+ assertFalse(backOff.atMaxAttempts());
+ backOff.nextBackOffMillis();
+ assertTrue(backOff.atMaxAttempts());
+ assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+ }
+}
[2/2] incubator-beam git commit: Closes #938
Posted by dh...@apache.org.
Closes #938
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82ebfd48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82ebfd48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82ebfd48
Branch: refs/heads/master
Commit: 82ebfd487eb6af129971c6e1b61218bcc0c89f9b
Parents: e9326c8 51c73a2
Author: Dan Halperin <dh...@google.com>
Authored: Fri Sep 9 13:13:35 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 9 13:13:35 2016 -0700
----------------------------------------------------------------------
...AttemptAndTimeBoundedExponentialBackOff.java | 173 +++++++++++++++
.../util/AttemptBoundedExponentialBackOff.java | 86 ++++++++
...mptAndTimeBoundedExponentialBackOffTest.java | 213 +++++++++++++++++++
.../AttemptBoundedExponentialBackOffTest.java | 85 ++++++++
4 files changed, 557 insertions(+)
----------------------------------------------------------------------