You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/29 03:09:07 UTC
[james-project] 07/13: JAMES-3494 Remove RetryWithAsyncCallback
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5d7afcb8debc4f1d00b444be0cab05a647f0b824
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:07:55 2021 +0100
JAMES-3494 Remove RetryWithAsyncCallback
Now that reactor is updated we have retryWhenAsync
---
.../blob/objectstorage/aws/S3BlobStoreDAO.java | 25 +-
.../java/reactor/retry/RetryWithAsyncCallback.java | 270 --------------
.../test/java/reactor/retry/RetryTestUtils.java | 122 -------
.../reactor/retry/RetryWithAsyncCallbackTest.java | 391 ---------------------
4 files changed, 15 insertions(+), 793 deletions(-)
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index fa00d0c..37f2810 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -55,8 +55,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;
-import reactor.retry.Retry;
-import reactor.retry.RetryWithAsyncCallback;
+import reactor.util.retry.RetryBackoffSpec;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.BytesWrapper;
@@ -250,14 +249,20 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.then();
}
- private Retry<Object> createBucketOnRetry(BucketName bucketName) {
- return RetryWithAsyncCallback.onlyIf(retryContext -> retryContext.exception() instanceof NoSuchBucketException)
- .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
- .withBackoffScheduler(Schedulers.elastic())
- .retryMax(MAX_RETRIES)
- .onRetryWithMono(retryContext -> clientPool.withPoolable(client -> Mono
- .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
- .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())).next());
+ private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
+ return RetryBackoffSpec.backoff(MAX_RETRIES, FIRST_BACK_OFF)
+ .maxAttempts(MAX_RETRIES)
+ .doBeforeRetryAsync(retrySignal -> {
+ if (retrySignal.failure() instanceof NoSuchBucketException) {
+ return clientPool.withPoolable(client -> Mono
+ .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+ .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
+ .next()
+ .then();
+ } else {
+ return Mono.error(retrySignal.failure());
+ }
+ });
}
@Override
diff --git a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
deleted file mode 100644
index 33220bc..0000000
--- a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package reactor.retry;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.util.Logger;
-import reactor.util.Loggers;
-
-/**
- * This class is a copy of reactor.retry.DefaultRetry.
- * Its goal is to provide a way to execute an async action before retrying.
- * To do so it provides a retryWithMono method which is the async equivalent of the synchronous method doOnRetry.
- *
- * This is a temporary solution as this new requirement has been exposed in an issue in the reactor project.
- * see : https://github.com/reactor/reactor-addons/issues/220
- *
- */
-public class RetryWithAsyncCallback<T> extends AbstractRetry<T, Throwable> implements Retry<T> {
-
- static final Logger log = Loggers.getLogger(RetryWithAsyncCallback.class);
- static final Consumer<? super RetryContext<?>> NOOP_ON_RETRY = r -> { };
- static final Function<? super RetryContext<?>, Mono<?>> NOOP_ON_RETRY_MONO = r -> Mono.empty();
-
- /**
- * Returns a retry function that retries any exception, once.
- * More constraints may be added using {@link #retryMax(long)} or {@link #timeout(Duration)}.
- *
- * @return retry function that retries on any exception
- */
- public static <T> RetryWithAsyncCallback<T> any() {
- return RetryWithAsyncCallback.<T>create(context -> true);
- }
-
- /**
- * Returns a retry function that retries errors resulting from any of the
- * specified exceptions, once.
- * More constraints may be added using {@link #retryMax(long)}
- * or {@link #timeout(Duration)}.
- *
- * @param retriableExceptions Exceptions that may be retried
- * @return retry function that retries indefinitely, only for specified exceptions
- */
- @SafeVarargs
- public static <T> RetryWithAsyncCallback<T> anyOf(Class<? extends Throwable>... retriableExceptions) {
- Predicate<? super RetryContext<T>> predicate = context -> {
- Throwable exception = context.exception();
- if (exception == null) {
- return true;
- }
- for (Class<? extends Throwable> clazz : retriableExceptions) {
- if (clazz.isInstance(exception)) {
- return true;
- }
- }
- return false;
- };
- return RetryWithAsyncCallback.<T>create(predicate);
- }
-
- /**
- * Returns a retry function that retries errors resulting from all exceptions except
- * the specified non-retriable exceptions, once.
- * More constraints may be added using
- * {@link #retryMax(long)} or {@link #timeout(Duration)}.
- *
- * @param nonRetriableExceptions exceptions that may not be retried
- * @return retry function that retries all exceptions except the specified non-retriable exceptions.
- */
- @SafeVarargs
- public static <T> RetryWithAsyncCallback<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
- Predicate<? super RetryContext<T>> predicate = context -> {
- Throwable exception = context.exception();
- if (exception == null) {
- return true;
- }
- for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
- if (clazz.isInstance(exception)) {
- return false;
- }
- }
- return true;
- };
- return RetryWithAsyncCallback.<T>create(predicate);
- }
-
- /**
- * Retry function that retries only if the predicate returns true, with no limit to
- * the number of attempts.
- * @param predicate Predicate that determines if next retry is performed
- * @return Retry function with predicate
- */
- public static <T> RetryWithAsyncCallback<T> onlyIf(Predicate<? super RetryContext<T>> predicate) {
- return RetryWithAsyncCallback.create(predicate).retryMax(Long.MAX_VALUE);
- }
-
- public static <T> RetryWithAsyncCallback<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
- return new RetryWithAsyncCallback<T>(retryPredicate,
- Long.MAX_VALUE,
- null,
- Backoff.zero(),
- Jitter.noJitter(),
- null,
- NOOP_ON_RETRY,
- NOOP_ON_RETRY_MONO,
- (T) null);
- }
-
- final Predicate<? super RetryContext<T>> retryPredicate;
- final Consumer<? super RetryContext<T>> onRetry;
- final Function<? super RetryContext<T>, Mono<?>> onRetryMono;
-
- RetryWithAsyncCallback(Predicate<? super RetryContext<T>> retryPredicate,
- long maxIterations,
- Duration timeout,
- Backoff backoff,
- Jitter jitter,
- Scheduler backoffScheduler,
- final Consumer<? super RetryContext<T>> onRetry,
- Function<? super RetryContext<T>, Mono<?>> onRetryMono,
- T applicationContext) {
- super(maxIterations, timeout, backoff, jitter, backoffScheduler, applicationContext);
- this.retryPredicate = retryPredicate;
- this.onRetry = onRetry;
- this.onRetryMono = onRetryMono;
- }
-
- @Override
- public RetryWithAsyncCallback<T> fixedBackoff(Duration backoffInterval) {
- return backoff(Backoff.fixed(backoffInterval));
- }
-
- @Override
- public RetryWithAsyncCallback<T> noBackoff() {
- return backoff(Backoff.zero());
- }
-
- @Override
- public RetryWithAsyncCallback<T> exponentialBackoff(Duration firstBackoff, Duration maxBackoff) {
- return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false));
- }
-
- @Override
- public RetryWithAsyncCallback<T> exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff) {
- return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false)).jitter(Jitter.random());
- }
-
- @Override
- public RetryWithAsyncCallback<T> randomBackoff(Duration firstBackoff, Duration maxBackoff) {
- return backoff(Backoff.exponential(firstBackoff, maxBackoff, 3, true)).jitter(Jitter.random());
- }
-
- @Override
- public RetryWithAsyncCallback<T> withApplicationContext(T applicationContext) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> doOnRetry(Consumer<? super RetryContext<T>> onRetry) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- public RetryWithAsyncCallback<T> onRetryWithMono(Function<? super RetryContext<T>, Mono<?>> onRetryMono) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> retryOnce() {
- return retryMax(1);
- }
-
- @Override
- public RetryWithAsyncCallback<T> retryMax(long maxIterations) {
- if (maxIterations < 0) {
- throw new IllegalArgumentException("maxIterations should be >= 0");
- }
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> timeout(Duration timeout) {
- if (timeout.isNegative()) {
- throw new IllegalArgumentException("timeout should be >= 0");
- }
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> backoff(Backoff backoff) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> jitter(Jitter jitter) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public RetryWithAsyncCallback<T> withBackoffScheduler(Scheduler scheduler) {
- return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
- backoff, jitter, scheduler, onRetry, onRetryMono, applicationContext);
- }
-
- @Override
- public Publisher<Long> apply(Flux<Throwable> errors) {
- Instant timeoutInstant = calculateTimeout();
- DefaultContext<T> context = new DefaultContext<>(applicationContext, 0L, null, null);
- return errors.index()
- .concatMap(tuple -> retry(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context));
- }
-
- Publisher<Long> retry(Throwable e, long iteration, Instant timeoutInstant, DefaultContext<T> context) {
- DefaultContext<T> tmpContext = new DefaultContext<>(applicationContext, iteration, context.lastBackoff, e);
- BackoffDelay nextBackoff = calculateBackoff(tmpContext, timeoutInstant);
- DefaultContext<T> retryContext = new DefaultContext<T>(applicationContext, iteration, nextBackoff, e);
- context.lastBackoff = nextBackoff;
-
- if (!retryPredicate.test(retryContext)) {
- log.debug("Stopping retries since predicate returned false, retry context: {}", retryContext);
- return Mono.error(e);
- } else if (nextBackoff == RETRY_EXHAUSTED) {
- log.debug("Retries exhausted, retry context: {}", retryContext);
- return Mono.error(new RetryExhaustedException(e));
- } else {
- log.debug("Scheduling retry attempt, retry context: {}", retryContext);
- onRetry.accept(retryContext);
- return onRetryMono.apply(retryContext)
- .then(Mono.from(retryMono(nextBackoff.delay())));
- }
- }
-
- @Override
- public String toString() {
- return "Retry{max=" + this.maxIterations + ",backoff=" + backoff + ",jitter=" +
- jitter + "}";
- }
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
deleted file mode 100644
index 075dcd9..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-
-public class RetryTestUtils {
-
- static void assertDelays(Queue<? extends IterationContext<?>> retries, Long... delayMs) {
- assertEquals(delayMs.length, retries.size());
- int index = 0;
- for (Iterator<? extends IterationContext<?>> it = retries.iterator(); it.hasNext(); ) {
- IterationContext<?> repeatContext = it.next();
- assertEquals(delayMs[index].longValue(), repeatContext.backoff().toMillis());
- index++;
- }
- }
-
- static void assertRandomDelays(Queue<? extends IterationContext<?>> retries, int firstMs, int maxMs) {
- long prevMs = 0;
- int randomValues = 0;
- for (IterationContext<?> context : retries) {
- long backoffMs = context.backoff().toMillis();
- assertTrue("Unexpected delay " + backoffMs, backoffMs >= firstMs && backoffMs <= maxMs);
- if (backoffMs != firstMs && backoffMs != prevMs)
- randomValues++;
- prevMs = backoffMs;
- }
- assertTrue("Delays not random", randomValues >= 2); // Allow for at most one edge case.
- }
-
- static <T> void testReuseInParallel(int threads, int iterations,
- Function<Backoff, Function<Flux<T>, Publisher<Long>>> retryOrRepeat,
- Consumer<Function<Flux<T>, Publisher<Long>>> testTask) throws Exception {
- int repeatCount = iterations - 1;
- AtomicInteger nextBackoff = new AtomicInteger();
- // Keep track of the number of backoff invocations per instance
- ConcurrentHashMap<Long, Integer> backoffCounts = new ConcurrentHashMap<>();
- // Use a countdown latch to get all instances to stop in the first backoff callback
- CountDownLatch latch = new CountDownLatch(threads);
- Backoff customBackoff = context -> {
- Duration backoff = context.backoff();
- if (latch.getCount() > 0) {
- assertNull("Wrong context, backoff must be null", backoff);
- backoff = Duration.ofMillis(nextBackoff.incrementAndGet());
- backoffCounts.put(backoff.toMillis(), 1);
- latch.countDown();
- try {
- latch.await(10, TimeUnit.SECONDS);
- }
- catch (Exception e) {
- // ignore, errors are handled later
- }
- } else {
- assertNotNull("Wrong context, backoff must not be null", backoff);
- long index = backoff.toMillis();
- backoffCounts.put(index, backoffCounts.get(index) + 1);
- }
- return new BackoffDelay(backoff);
- };
- Function<Flux<T>, Publisher<Long>> retryFunc = retryOrRepeat.apply(customBackoff);
- ExecutorService executor = Executors.newFixedThreadPool(threads);
- List<Future<?>> futures = new ArrayList<>();
- try {
- for (int i = 0; i < threads; i++) {
- Runnable runnable = () -> testTask.accept(retryFunc);
- futures.add(executor.submit(runnable));
- }
- for (Future<?> future : futures)
- future.get(5, TimeUnit.SECONDS);
- }
- finally {
- executor.shutdownNow();
- }
-
- assertEquals(0, latch.getCount());
- assertEquals(threads, backoffCounts.size());
- for (Integer count : backoffCounts.values()) {
- //backoff not invoked anymore when maxIteration reached
- assertEquals(repeatCount, count.intValue());
- }
- }
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
deleted file mode 100644
index 2f2aad5..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
-
-import org.junit.Test;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
-
-public class RetryWithAsyncCallbackTest {
-
- private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
-
- @Test
- public void shouldTimeoutRetryWithVirtualTime() {
- // given
- final int minBackoff = 1;
- final int maxBackoff = 5;
- final int timeout = 10;
-
- // then
- StepVerifier.withVirtualTime(() ->
- Mono.<String>error(new RuntimeException("Something went wrong"))
- .retryWhen(RetryWithAsyncCallback.anyOf(Exception.class)
- .exponentialBackoffWithJitter(Duration.ofSeconds(minBackoff), Duration.ofSeconds(maxBackoff))
- .timeout(Duration.ofSeconds(timeout)))
- .subscribeOn(Schedulers.elastic()))
- .expectSubscription()
-// .expectNoEvent(Duration.ofSeconds(timeout))
- .thenAwait(Duration.ofSeconds(timeout))
- .expectError(RetryExhaustedException.class)
- .verify(Duration.ofSeconds(timeout));
- }
-
- @Test
- public void fluxRetryNoBackoff() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
- .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1, 0, 1)
- .verifyError(RetryExhaustedException.class);
- assertRetries(IOException.class, IOException.class);
- RetryTestUtils.assertDelays(retries, 0L, 0L);
- }
-
- @Test
- public void monoRetryNoBackoff() {
- Mono<?> mono = Mono.error(new IOException())
- .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
- StepVerifier.create(mono)
- .verifyError(RetryExhaustedException.class);
- assertRetries(IOException.class, IOException.class);
- RetryTestUtils.assertDelays(retries, 0L, 0L);
- }
-
- @Test
- public void fluxRetryFixedBackoff() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
- .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
- StepVerifier.withVirtualTime(() -> flux)
- .expectNext(0, 1)
- .expectNoEvent(Duration.ofMillis(300))
- .thenAwait(Duration.ofMillis(300))
- .expectNext(0, 1)
- .verifyError(RetryExhaustedException.class);
- assertRetries(IOException.class);
- RetryTestUtils.assertDelays(retries, 500L);
- }
-
- @Test
- public void monoRetryFixedBackoff() {
- Mono<?> mono = Mono.error(new IOException())
- .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
- StepVerifier.withVirtualTime(() -> mono)
- .expectSubscription()
- .expectNoEvent(Duration.ofMillis(300))
- .thenAwait(Duration.ofMillis(300))
- .verifyError(RetryExhaustedException.class);
-
- assertRetries(IOException.class);
- RetryTestUtils.assertDelays(retries, 500L);
- }
-
-
- @Test
- public void fluxRetryExponentialBackoff() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
- .retryWhen(RetryWithAsyncCallback.any()
- .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
- .timeout(Duration.ofMillis(1500))
- .doOnRetry(onRetry()));
-
- StepVerifier.create(flux)
- .expectNext(0, 1)
- .expectNoEvent(Duration.ofMillis(50)) // delay=100
- .expectNext(0, 1)
- .expectNoEvent(Duration.ofMillis(150)) // delay=200
- .expectNext(0, 1)
- .expectNoEvent(Duration.ofMillis(250)) // delay=400
- .expectNext(0, 1)
- .expectNoEvent(Duration.ofMillis(450)) // delay=500
- .expectNext(0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
- assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
- RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
- }
- @Test
- public void monoRetryExponentialBackoff() {
- Mono<?> mono = Mono.error(new IOException())
- .retryWhen(RetryWithAsyncCallback.any()
- .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
- .retryMax(4)
- .doOnRetry(onRetry()));
-
- StepVerifier.withVirtualTime(() -> mono)
- .expectSubscription()
- .thenAwait(Duration.ofMillis(100))
- .thenAwait(Duration.ofMillis(200))
- .thenAwait(Duration.ofMillis(400))
- .thenAwait(Duration.ofMillis(500))
- .verifyError(RetryExhaustedException.class);
-
- assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
- RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
- }
-
- @Test
- public void fluxRetryRandomBackoff() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
- .retryWhen(RetryWithAsyncCallback.any()
- .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
- .retryMax(4)
- .doOnRetry(onRetry()));
-
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1, 0, 1, 0, 1, 0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
- assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
- RetryTestUtils.assertRandomDelays(retries, 100, 2000);
- }
-
- @Test
- public void monoRetryRandomBackoff() {
- Mono<?> mono = Mono.error(new IOException())
- .retryWhen(RetryWithAsyncCallback.any()
- .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
- .retryMax(4)
- .doOnRetry(onRetry()));
-
- StepVerifier.withVirtualTime(() -> mono)
- .expectSubscription()
- .thenAwait(Duration.ofMillis(100))
- .thenAwait(Duration.ofMillis(2000))
- .thenAwait(Duration.ofMillis(2000))
- .thenAwait(Duration.ofMillis(2000))
- .verifyError(RetryExhaustedException.class);
-
- assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
- RetryTestUtils.assertRandomDelays(retries, 100, 2000);
- }
-
-
- @Test
- public void fluxRetriableExceptions() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
- .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
-
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
- Flux<Integer> nonRetriable = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException()))
- .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
- StepVerifier.create(nonRetriable)
- .expectNext(0, 1)
- .verifyError(RuntimeException.class);
-
- }
-
- @Test
- public void fluxNonRetriableExceptions() {
-
- Retry<?> retry = RetryWithAsyncCallback.allBut(RuntimeException.class).retryOnce().doOnRetry(onRetry());
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IllegalStateException())).retryWhen(retry);
-
- StepVerifier.create(flux)
- .expectNext(0, 1)
- .verifyError(IllegalStateException.class);
-
-
- Flux<Integer> retriable = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
- StepVerifier.create(retriable)
- .expectNext(0, 1, 0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
- }
-
- @Test
- public void fluxRetryAnyException() {
- Retry<?> retry = RetryWithAsyncCallback.any().retryOnce().doOnRetry(onRetry());
-
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
- Flux<Integer> flux2 = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException())).retryWhen(retry);
- StepVerifier.create(flux2)
- .expectNext(0, 1, 0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
- }
-
- @Test
- public void fluxRetryOnPredicate() {
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
- .retryWhen(RetryWithAsyncCallback.onlyIf(context -> context.iteration() < 3).doOnRetry(onRetry()));
-
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1, 0, 1)
- .verifyError(SocketException.class);
- }
-
-
- @Test
- public void doOnRetry() {
- Semaphore semaphore = new Semaphore(0);
- Retry<?> retry = RetryWithAsyncCallback.any()
- .retryOnce()
- .fixedBackoff(Duration.ofMillis(500))
- .doOnRetry(context -> semaphore.release());
-
- StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
- .expectNext(0, 1)
- .then(semaphore::acquireUninterruptibly)
- .expectNoEvent(Duration.ofMillis(400))
- .thenAwait(Duration.ofMillis(200))
- .expectNext(0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
- StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
- .then(semaphore::acquireUninterruptibly)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
- }
-
- @Test
- public void onRetryWithMono() {
- Semaphore semaphore = new Semaphore(0);
- Retry<?> retry = RetryWithAsyncCallback.any()
- .retryOnce()
- .fixedBackoff(Duration.ofMillis(500))
- .onRetryWithMono(context -> Mono.fromCallable(() -> { semaphore.release(); return 0; }));
-
- StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
- .expectNext(0, 1)
- .then(semaphore::acquireUninterruptibly)
- .expectNoEvent(Duration.ofMillis(400))
- .thenAwait(Duration.ofMillis(200))
- .expectNext(0, 1)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
- StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
- .then(semaphore::acquireUninterruptibly)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
- }
-
- @Test
- public void retryApplicationContext() {
- class AppContext {
- boolean needsRollback;
- void rollback() {
- needsRollback = false;
- }
- void run() {
- assertFalse("Rollback not performed", needsRollback);
- needsRollback = true;
- }
- }
- AppContext appContext = new AppContext();
- Retry<?> retry = RetryWithAsyncCallback.<AppContext>any().withApplicationContext(appContext)
- .retryMax(2)
- .doOnRetry(context -> {
- AppContext ac = context.applicationContext();
- assertNotNull("Application context not propagated", ac);
- ac.rollback();
- });
-
- StepVerifier.withVirtualTime(() -> Mono.error(new RuntimeException()).doOnNext(i -> appContext.run()).retryWhen(retry))
- .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
- }
-
- @Test
- public void fluxRetryCompose() {
- Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
- Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())).as(retry::apply);
-
- StepVerifier.create(flux)
- .expectNext(0, 1, 0, 1, 0, 1)
- .verifyError(RetryExhaustedException.class);
- assertRetries(IOException.class, IOException.class);
- }
-
- @Test
- public void monoRetryCompose() {
- Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
- Flux<?> flux = Mono.error(new IOException()).as(retry::apply);
-
- StepVerifier.create(flux)
- .verifyError(RetryExhaustedException.class);
- assertRetries(IOException.class, IOException.class);
- }
-
- @Test
- public void functionReuseInParallel() throws Exception {
- int retryCount = 19;
- int range = 100;
- Integer[] values = new Integer[(retryCount + 1) * range];
- for (int i = 0; i <= retryCount; i++) {
- for (int j = 1; j <= range; j++)
- values[i * range + j - 1] = j;
- }
- RetryTestUtils.testReuseInParallel(2, 20,
- backoff -> RetryWithAsyncCallback.<Integer>any().retryMax(19).backoff(backoff),
- retryFunc -> StepVerifier.create(Flux.range(1, range).concatWith(Mono.error(new SocketException())).retryWhen(retryFunc))
- .expectNext(values)
- .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)));
- }
-
- Consumer<? super RetryContext<?>> onRetry() {
- return context -> retries.add(context);
- }
-
- @SafeVarargs
- private final void assertRetries(Class<? extends Throwable>... exceptions) {
- assertEquals(exceptions.length, retries.size());
- int index = 0;
- for (Iterator<RetryContext<?>> it = retries.iterator(); it.hasNext(); ) {
- RetryContext<?> retryContext = it.next();
- assertEquals(index + 1, retryContext.iteration());
- assertEquals(exceptions[index], retryContext.exception().getClass());
- index++;
- }
- }
-
- static boolean isRetryExhausted(Throwable e, Class<? extends Throwable> cause) {
- return e instanceof RetryExhaustedException && cause.isInstance(e.getCause());
- }
-
- @Test
- public void retryToString() {
- System.out.println(RetryWithAsyncCallback.any().noBackoff().retryMax(2).toString());
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org