You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/09 20:13:53 UTC

[1/2] incubator-beam git commit: Revert Backoff Classes to Fix Worker Dependency Error

Repository: incubator-beam
Updated Branches:
  refs/heads/master e9326c8b1 -> 82ebfd487


Revert Backoff Classes to Fix Worker Dependency Error


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51c73a29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51c73a29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51c73a29

Branch: refs/heads/master
Commit: 51c73a29f53f0be621bcd42c2b2845cff0ef960d
Parents: e9326c8
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Fri Sep 9 10:25:12 2016 -0700
Committer: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Committed: Fri Sep 9 10:25:12 2016 -0700

----------------------------------------------------------------------
 ...AttemptAndTimeBoundedExponentialBackOff.java | 173 +++++++++++++++
 .../util/AttemptBoundedExponentialBackOff.java  |  86 ++++++++
 ...mptAndTimeBoundedExponentialBackOffTest.java | 213 +++++++++++++++++++
 .../AttemptBoundedExponentialBackOffTest.java   |  85 ++++++++
 4 files changed, 557 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
new file mode 100644
index 0000000..3fe2918
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.NanoClock;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff
+ * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff
+ * unless the time interval has expired since the object was created. At this point, it will always
+ * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts,
+ * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the
+ * constructor.
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff {
+  private long endTimeMillis;
+  private long maximumTotalWaitTimeMillis;
+  private ResetPolicy resetPolicy;
+  private final NanoClock nanoClock;
+  // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns.  Here, we choose 2^53 ns as
+  // a smaller but still huge limit.
+  private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53;
+
+  /**
+   * A ResetPolicy controls the behavior of this BackOff when reset() is called.  By default, both
+   * the number of attempts and the time bound for the BackOff are reset, but an alternative
+   * ResetPolicy may be set to only reset one of these two.
+   */
+  public static enum ResetPolicy {
+    ALL,
+    ATTEMPTS,
+    TIMER
+  }
+
+  /**
+   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+   *
+   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+   *    allow more attempts in milliseconds.
+   */
+  public AttemptAndTimeBoundedExponentialBackOff(
+      int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) {
+    this(
+        maximumNumberOfAttempts,
+        initialIntervalMillis,
+        maximumTotalWaitTimeMillis,
+        ResetPolicy.ALL,
+        NanoClock.SYSTEM);
+  }
+
+  /**
+   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+   *
+   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+   *    allow more attempts in milliseconds.
+   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
+   *    to being reset.
+   */
+  public AttemptAndTimeBoundedExponentialBackOff(
+      int maximumNumberOfAttempts,
+      long initialIntervalMillis,
+      long maximumTotalWaitTimeMillis,
+      ResetPolicy resetPolicy) {
+    this(
+        maximumNumberOfAttempts,
+        initialIntervalMillis,
+        maximumTotalWaitTimeMillis,
+        resetPolicy,
+        NanoClock.SYSTEM);
+  }
+
+  /**
+   * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff.
+   *
+   * @param maximumNumberOfAttempts The maximum number of attempts it will make.
+   * @param initialIntervalMillis The original interval to wait between attempts in milliseconds.
+   * @param maximumTotalWaitTimeMillis The maximum total time that this object will
+   *    allow more attempts in milliseconds.
+   * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject
+   *    to being reset.
+   * @param nanoClock clock used to measure the time that has passed.
+   */
+  public AttemptAndTimeBoundedExponentialBackOff(
+      int maximumNumberOfAttempts,
+      long initialIntervalMillis,
+      long maximumTotalWaitTimeMillis,
+      ResetPolicy resetPolicy,
+      NanoClock nanoClock) {
+    super(maximumNumberOfAttempts, initialIntervalMillis);
+    checkArgument(
+        maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero.");
+    checkArgument(
+        maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS,
+        "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds");
+    checkArgument(resetPolicy != null, "resetPolicy may not be null");
+    checkArgument(nanoClock != null, "nanoClock may not be null");
+    this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis;
+    this.resetPolicy = resetPolicy;
+    this.nanoClock = nanoClock;
+    // Set the end time for this BackOff.  Note that we cannot simply call reset() here since the
+    // resetPolicy may not be set to reset the time bound.
+    endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
+      justification = "Explicitly handled in implementation.")
+  public void reset() {
+    // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are
+    // set.  In this case, we call the parent class's reset() method and return.
+    if (resetPolicy == null) {
+      super.reset();
+      return;
+    }
+    // Reset the number of attempts.
+    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) {
+      super.reset();
+    }
+    // Reset the time bound.
+    if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) {
+      endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis;
+    }
+  }
+
+  public void setEndtimeMillis(long endTimeMillis) {
+    this.endTimeMillis = endTimeMillis;
+  }
+
+  @Override
+  public long nextBackOffMillis() {
+    if (atMaxAttempts()) {
+      return BackOff.STOP;
+    }
+    long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis());
+    return (backoff > 0 ? backoff : BackOff.STOP);
+  }
+
+  private long getTimeMillis() {
+    return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime());
+  }
+
+  @Override
+  public boolean atMaxAttempts() {
+    return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
new file mode 100644
index 0000000..8f6f854
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+
+
+/**
+ * Implementation of {@link BackOff} that increases the back off period for each retry attempt
+ * using a randomization function that grows exponentially.
+ *
+ * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10.
+ * For 10 tries the sequence will be (values in seconds):
+ *
+ * <pre>
+ * retry#      retry_interval     randomized_interval
+ * 1             0.5                [0.25,   0.75]
+ * 2             0.75               [0.375,  1.125]
+ * 3             1.125              [0.562,  1.687]
+ * 4             1.687              [0.8435, 2.53]
+ * 5             2.53               [1.265,  3.795]
+ * 6             3.795              [1.897,  5.692]
+ * 7             5.692              [2.846,  8.538]
+ * 8             8.538              [4.269, 12.807]
+ * 9            12.807              [6.403, 19.210]
+ * 10           {@link BackOff#STOP}
+ * </pre>
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class AttemptBoundedExponentialBackOff implements BackOff {
+  public static final double DEFAULT_MULTIPLIER = 1.5;
+  public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+  private final int maximumNumberOfAttempts;
+  private final long initialIntervalMillis;
+  private int currentAttempt;
+
+  public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) {
+    checkArgument(maximumNumberOfAttempts > 0,
+        "Maximum number of attempts must be greater than zero.");
+    checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
+    this.maximumNumberOfAttempts = maximumNumberOfAttempts;
+    this.initialIntervalMillis = initialIntervalMillis;
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    currentAttempt = 1;
+  }
+
+  @Override
+  public long nextBackOffMillis() {
+    if (currentAttempt >= maximumNumberOfAttempts) {
+      return BackOff.STOP;
+    }
+    double currentIntervalMillis = initialIntervalMillis
+        * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1);
+    double randomOffset = (Math.random() * 2 - 1)
+        * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+    currentAttempt += 1;
+    return Math.round(currentIntervalMillis + randomOffset);
+  }
+
+  public boolean atMaxAttempts() {
+    return currentAttempt >= maximumNumberOfAttempts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
new file mode 100644
index 0000000..f1d7371
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.BackOff;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link AttemptAndTimeBoundedExponentialBackOff}. */
+@RunWith(JUnit4.class)
+@SuppressWarnings("deprecation") // test of deprecated class
+public class AttemptAndTimeBoundedExponentialBackOffTest {
+  @Rule public ExpectedException exception = ExpectedException.none();
+  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+
+  @Test
+  public void testUsingInvalidInitialInterval() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Initial interval must be greater than zero.");
+    new AttemptAndTimeBoundedExponentialBackOff(10, 0L, 1000L);
+  }
+
+  @Test
+  public void testUsingInvalidTimeInterval() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Maximum total wait time must be greater than zero.");
+    new AttemptAndTimeBoundedExponentialBackOff(10, 2L, 0L);
+  }
+
+  @Test
+  public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Maximum number of attempts must be greater than zero.");
+    new AttemptAndTimeBoundedExponentialBackOff(-1, 10L, 1000L);
+  }
+
+  @Test
+  public void testThatFixedNumberOfAttemptsExits() throws Exception {
+    BackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3,
+            500L,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testThatResettingAllowsReuse() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3,
+            500,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+
+    backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            30,
+            500,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    fastClock.sleep(2000L);
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+  }
+
+  @Test
+  public void testThatResettingAttemptsAllowsReuse() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3,
+            500,
+            1000,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testThatResettingAttemptsDoesNotAllowsReuse() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            30,
+            500,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    fastClock.sleep(2000L);
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testThatResettingTimerAllowsReuse() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            30,
+            500,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    fastClock.sleep(2000L);
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(561L), lessThan(1688L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(843L), lessThan(2531L)));
+  }
+
+  @Test
+  public void testThatResettingTimerDoesNotAllowReuse() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3,
+            500,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER,
+            fastClock);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testTimeBound() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3, 500L, 5L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
+    assertEquals(backOff.nextBackOffMillis(), 5L);
+  }
+
+  @Test
+  public void testAtMaxAttempts() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3,
+            500L,
+            1000L,
+            AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL,
+            fastClock);
+    assertFalse(backOff.atMaxAttempts());
+    backOff.nextBackOffMillis();
+    assertFalse(backOff.atMaxAttempts());
+    backOff.nextBackOffMillis();
+    assertTrue(backOff.atMaxAttempts());
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testAtMaxTime() throws Exception {
+    AttemptBoundedExponentialBackOff backOff =
+        new AttemptAndTimeBoundedExponentialBackOff(
+            3, 500L, 1L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock);
+    fastClock.sleep(2);
+    assertTrue(backOff.atMaxAttempts());
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51c73a29/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
new file mode 100644
index 0000000..44e435e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.util.BackOff;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link AttemptBoundedExponentialBackOff}. */
+@RunWith(JUnit4.class)
+@SuppressWarnings("deprecation") // test of deprecated class
+public class AttemptBoundedExponentialBackOffTest {
+  @Rule public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testUsingInvalidInitialInterval() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Initial interval must be greater than zero.");
+    new AttemptBoundedExponentialBackOff(10, 0L);
+  }
+
+  @Test
+  public void testUsingInvalidMaximumNumberOfRetries() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Maximum number of attempts must be greater than zero.");
+    new AttemptBoundedExponentialBackOff(-1, 10L);
+  }
+
+  @Test
+  public void testThatFixedNumberOfAttemptsExits() throws Exception {
+    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testThatResettingAllowsReuse() throws Exception {
+    BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L)));
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testAtMaxAttempts() throws Exception {
+    AttemptBoundedExponentialBackOff backOff = new AttemptBoundedExponentialBackOff(3, 500);
+    assertFalse(backOff.atMaxAttempts());
+    backOff.nextBackOffMillis();
+    assertFalse(backOff.atMaxAttempts());
+    backOff.nextBackOffMillis();
+    assertTrue(backOff.atMaxAttempts());
+    assertEquals(BackOff.STOP, backOff.nextBackOffMillis());
+  }
+}


[2/2] incubator-beam git commit: Closes #938

Posted by dh...@apache.org.
Closes #938


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82ebfd48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82ebfd48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82ebfd48

Branch: refs/heads/master
Commit: 82ebfd487eb6af129971c6e1b61218bcc0c89f9b
Parents: e9326c8 51c73a2
Author: Dan Halperin <dh...@google.com>
Authored: Fri Sep 9 13:13:35 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 9 13:13:35 2016 -0700

----------------------------------------------------------------------
 ...AttemptAndTimeBoundedExponentialBackOff.java | 173 +++++++++++++++
 .../util/AttemptBoundedExponentialBackOff.java  |  86 ++++++++
 ...mptAndTimeBoundedExponentialBackOffTest.java | 213 +++++++++++++++++++
 .../AttemptBoundedExponentialBackOffTest.java   |  85 ++++++++
 4 files changed, 557 insertions(+)
----------------------------------------------------------------------