You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/29 03:09:07 UTC

[james-project] 07/13: JAMES-3494 Remove RetryWithAsyncCallback

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5d7afcb8debc4f1d00b444be0cab05a647f0b824
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:07:55 2021 +0100

    JAMES-3494 Remove RetryWithAsyncCallback
    
    Now that reactor is updated we have retryWhenAsync
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |  25 +-
 .../java/reactor/retry/RetryWithAsyncCallback.java | 270 --------------
 .../test/java/reactor/retry/RetryTestUtils.java    | 122 -------
 .../reactor/retry/RetryWithAsyncCallbackTest.java  | 391 ---------------------
 4 files changed, 15 insertions(+), 793 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index fa00d0c..37f2810 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -55,8 +55,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.pool.InstrumentedPool;
 import reactor.pool.PoolBuilder;
-import reactor.retry.Retry;
-import reactor.retry.RetryWithAsyncCallback;
+import reactor.util.retry.RetryBackoffSpec;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.BytesWrapper;
@@ -250,14 +249,20 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .then();
     }
 
-    private Retry<Object> createBucketOnRetry(BucketName bucketName) {
-        return RetryWithAsyncCallback.onlyIf(retryContext -> retryContext.exception() instanceof NoSuchBucketException)
-            .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
-            .withBackoffScheduler(Schedulers.elastic())
-            .retryMax(MAX_RETRIES)
-            .onRetryWithMono(retryContext -> clientPool.withPoolable(client -> Mono
-                .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
-                .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())).next());
+    private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
+        return RetryBackoffSpec.backoff(MAX_RETRIES, FIRST_BACK_OFF)
+            .maxAttempts(MAX_RETRIES)
+            .doBeforeRetryAsync(retrySignal -> {
+                if (retrySignal.failure() instanceof NoSuchBucketException) {
+                    return clientPool.withPoolable(client -> Mono
+                        .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
+                        .next()
+                        .then();
+                } else {
+                    return Mono.error(retrySignal.failure());
+                }
+            });
     }
 
     @Override
diff --git a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
deleted file mode 100644
index 33220bc..0000000
--- a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.util.Logger;
-import reactor.util.Loggers;
-
-/**
- * This class is a copy of reactor.retry.DefaultRetry.
- * Its goal is to provide a way to execute an async action before retrying.
- * To do so it provides a retryWithMono method which is the async equivalent of the synchronous method doOnRetry.
- *
- * This is a temporary solution as this new requirement has been exposed in an issue in the reactor project.
- * see : https://github.com/reactor/reactor-addons/issues/220
- *
- */
-public class RetryWithAsyncCallback<T> extends AbstractRetry<T, Throwable> implements Retry<T> {
-
-    static final Logger log = Loggers.getLogger(RetryWithAsyncCallback.class);
-    static final Consumer<? super RetryContext<?>> NOOP_ON_RETRY = r -> { };
-    static final Function<? super RetryContext<?>, Mono<?>> NOOP_ON_RETRY_MONO = r -> Mono.empty();
-
-    /**
-     * Returns a retry function that retries any exception, once.
-     * More constraints may be added using {@link #retryMax(long)} or {@link #timeout(Duration)}.
-     *
-     * @return retry function that retries on any exception
-     */
-    public static <T> RetryWithAsyncCallback<T> any() {
-        return RetryWithAsyncCallback.<T>create(context -> true);
-    }
-
-    /**
-     * Returns a retry function that retries errors resulting from any of the
-     * specified exceptions, once.
-     * More constraints may be added using {@link #retryMax(long)}
-     * or {@link #timeout(Duration)}.
-     *
-     * @param retriableExceptions Exceptions that may be retried
-     * @return retry function that retries indefinitely, only for specified exceptions
-     */
-    @SafeVarargs
-    public static <T> RetryWithAsyncCallback<T> anyOf(Class<? extends Throwable>... retriableExceptions) {
-        Predicate<? super RetryContext<T>> predicate = context -> {
-            Throwable exception = context.exception();
-            if (exception == null) {
-                return true;
-            }
-            for (Class<? extends Throwable> clazz : retriableExceptions) {
-                if (clazz.isInstance(exception)) {
-                    return true;
-                }
-            }
-            return false;
-        };
-        return RetryWithAsyncCallback.<T>create(predicate);
-    }
-
-    /**
-     * Returns a retry function that retries errors resulting from all exceptions except
-     * the specified non-retriable exceptions, once.
-     * More constraints may be added using
-     * {@link #retryMax(long)} or {@link #timeout(Duration)}.
-     *
-     * @param nonRetriableExceptions exceptions that may not be retried
-     * @return retry function that retries all exceptions except the specified non-retriable exceptions.
-     */
-    @SafeVarargs
-    public static <T> RetryWithAsyncCallback<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
-        Predicate<? super RetryContext<T>> predicate = context -> {
-            Throwable exception = context.exception();
-            if (exception == null) {
-                return true;
-            }
-            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
-                if (clazz.isInstance(exception)) {
-                    return false;
-                }
-            }
-            return true;
-        };
-        return RetryWithAsyncCallback.<T>create(predicate);
-    }
-
-    /**
-     * Retry function that retries only if the predicate returns true, with no limit to
-     * the number of attempts.
-     * @param predicate Predicate that determines if next retry is performed
-     * @return Retry function with predicate
-     */
-    public static <T> RetryWithAsyncCallback<T> onlyIf(Predicate<? super RetryContext<T>> predicate) {
-        return RetryWithAsyncCallback.create(predicate).retryMax(Long.MAX_VALUE);
-    }
-
-    public static <T> RetryWithAsyncCallback<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
-        return new RetryWithAsyncCallback<T>(retryPredicate,
-            Long.MAX_VALUE,
-            null,
-            Backoff.zero(),
-            Jitter.noJitter(),
-            null,
-            NOOP_ON_RETRY,
-            NOOP_ON_RETRY_MONO,
-            (T) null);
-    }
-
-    final Predicate<? super RetryContext<T>> retryPredicate;
-    final Consumer<? super RetryContext<T>> onRetry;
-    final Function<? super RetryContext<T>, Mono<?>> onRetryMono;
-
-    RetryWithAsyncCallback(Predicate<? super RetryContext<T>> retryPredicate,
-                           long maxIterations,
-                           Duration timeout,
-                           Backoff backoff,
-                           Jitter jitter,
-                           Scheduler backoffScheduler,
-                           final Consumer<? super RetryContext<T>> onRetry,
-                           Function<? super RetryContext<T>, Mono<?>> onRetryMono,
-                           T applicationContext) {
-        super(maxIterations, timeout, backoff, jitter, backoffScheduler, applicationContext);
-        this.retryPredicate = retryPredicate;
-        this.onRetry = onRetry;
-        this.onRetryMono = onRetryMono;
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> fixedBackoff(Duration backoffInterval) {
-        return backoff(Backoff.fixed(backoffInterval));
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> noBackoff() {
-        return backoff(Backoff.zero());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> exponentialBackoff(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false));
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false)).jitter(Jitter.random());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> randomBackoff(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 3, true)).jitter(Jitter.random());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> withApplicationContext(T applicationContext) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> doOnRetry(Consumer<? super RetryContext<T>> onRetry) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    public RetryWithAsyncCallback<T> onRetryWithMono(Function<? super RetryContext<T>, Mono<?>> onRetryMono) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> retryOnce() {
-        return retryMax(1);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> retryMax(long maxIterations) {
-        if (maxIterations < 0) {
-            throw new IllegalArgumentException("maxIterations should be >= 0");
-        }
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> timeout(Duration timeout) {
-        if (timeout.isNegative()) {
-            throw new IllegalArgumentException("timeout should be >= 0");
-        }
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> backoff(Backoff backoff) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> jitter(Jitter jitter) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> withBackoffScheduler(Scheduler scheduler) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, scheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public Publisher<Long> apply(Flux<Throwable> errors) {
-        Instant timeoutInstant = calculateTimeout();
-        DefaultContext<T> context = new DefaultContext<>(applicationContext, 0L, null, null);
-        return errors.index()
-            .concatMap(tuple -> retry(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context));
-    }
-
-    Publisher<Long> retry(Throwable e, long iteration, Instant timeoutInstant, DefaultContext<T> context) {
-        DefaultContext<T> tmpContext = new DefaultContext<>(applicationContext, iteration, context.lastBackoff, e);
-        BackoffDelay nextBackoff = calculateBackoff(tmpContext, timeoutInstant);
-        DefaultContext<T> retryContext = new DefaultContext<T>(applicationContext, iteration, nextBackoff, e);
-        context.lastBackoff = nextBackoff;
-
-        if (!retryPredicate.test(retryContext)) {
-            log.debug("Stopping retries since predicate returned false, retry context: {}", retryContext);
-            return Mono.error(e);
-        } else if (nextBackoff == RETRY_EXHAUSTED) {
-            log.debug("Retries exhausted, retry context: {}", retryContext);
-            return Mono.error(new RetryExhaustedException(e));
-        } else {
-            log.debug("Scheduling retry attempt, retry context: {}", retryContext);
-            onRetry.accept(retryContext);
-            return onRetryMono.apply(retryContext)
-                .then(Mono.from(retryMono(nextBackoff.delay())));
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "Retry{max=" + this.maxIterations + ",backoff=" + backoff + ",jitter=" +
-            jitter + "}";
-    }
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
deleted file mode 100644
index 075dcd9..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-
-public class RetryTestUtils {
-
-	static void assertDelays(Queue<? extends IterationContext<?>> retries, Long... delayMs) {
-		assertEquals(delayMs.length, retries.size());
-		int index = 0;
-		for (Iterator<? extends IterationContext<?>> it = retries.iterator(); it.hasNext(); ) {
-			IterationContext<?> repeatContext = it.next();
-			assertEquals(delayMs[index].longValue(), repeatContext.backoff().toMillis());
-			index++;
-		}
-	}
-
-	static void assertRandomDelays(Queue<? extends IterationContext<?>> retries, int firstMs, int maxMs) {
-		long prevMs = 0;
-		int randomValues = 0;
-		for (IterationContext<?> context : retries) {
-			long backoffMs = context.backoff().toMillis();
-			assertTrue("Unexpected delay " + backoffMs, backoffMs >= firstMs && backoffMs <= maxMs);
-			if (backoffMs != firstMs && backoffMs != prevMs)
-				randomValues++;
-			prevMs = backoffMs;
-		}
-		assertTrue("Delays not random", randomValues >= 2); // Allow for at most one edge case.
-	}
-
-	static <T> void testReuseInParallel(int threads, int iterations,
-			Function<Backoff, Function<Flux<T>, Publisher<Long>>> retryOrRepeat,
-			Consumer<Function<Flux<T>, Publisher<Long>>> testTask) throws Exception {
-		int repeatCount = iterations - 1;
-		AtomicInteger nextBackoff = new AtomicInteger();
-		// Keep track of the number of backoff invocations per instance
-		ConcurrentHashMap<Long, Integer> backoffCounts = new ConcurrentHashMap<>();
-		// Use a countdown latch to get all instances to stop in the first backoff callback
-		CountDownLatch latch = new CountDownLatch(threads);
-		Backoff customBackoff = context -> {
-			Duration backoff = context.backoff();
-			if (latch.getCount() > 0) {
-				assertNull("Wrong context, backoff must be null", backoff);
-				backoff = Duration.ofMillis(nextBackoff.incrementAndGet());
-				backoffCounts.put(backoff.toMillis(), 1);
-				latch.countDown();
-				try {
-					latch.await(10, TimeUnit.SECONDS);
-				}
-				catch (Exception e) {
-					// ignore, errors are handled later
-				}
-			} else {
-				assertNotNull("Wrong context, backoff must not be null", backoff);
-				long index = backoff.toMillis();
-				backoffCounts.put(index, backoffCounts.get(index) + 1);
-			}
-			return new BackoffDelay(backoff);
-		};
-		Function<Flux<T>, Publisher<Long>> retryFunc = retryOrRepeat.apply(customBackoff);
-		ExecutorService executor = Executors.newFixedThreadPool(threads);
-		List<Future<?>> futures = new ArrayList<>();
-		try {
-			for (int i = 0; i < threads; i++) {
-				Runnable runnable = () -> testTask.accept(retryFunc);
-				futures.add(executor.submit(runnable));
-			}
-			for (Future<?> future : futures)
-				future.get(5, TimeUnit.SECONDS);
-		}
-		finally {
-			executor.shutdownNow();
-		}
-
-		assertEquals(0, latch.getCount());
-		assertEquals(threads, backoffCounts.size());
-		for (Integer count : backoffCounts.values()) {
-			//backoff not invoked anymore when maxIteration reached
-			assertEquals(repeatCount, count.intValue());
-		}
-	}
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
deleted file mode 100644
index 2f2aad5..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
-
-import org.junit.Test;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
-
-public class RetryWithAsyncCallbackTest {
-
-    private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
-
-    @Test
-    public void shouldTimeoutRetryWithVirtualTime() {
-        // given
-        final int minBackoff = 1;
-        final int maxBackoff = 5;
-        final int timeout = 10;
-
-        // then
-        StepVerifier.withVirtualTime(() ->
-            Mono.<String>error(new RuntimeException("Something went wrong"))
-                .retryWhen(RetryWithAsyncCallback.anyOf(Exception.class)
-                    .exponentialBackoffWithJitter(Duration.ofSeconds(minBackoff), Duration.ofSeconds(maxBackoff))
-                    .timeout(Duration.ofSeconds(timeout)))
-                .subscribeOn(Schedulers.elastic()))
-            .expectSubscription()
-//				.expectNoEvent(Duration.ofSeconds(timeout))
-            .thenAwait(Duration.ofSeconds(timeout))
-            .expectError(RetryExhaustedException.class)
-            .verify(Duration.ofSeconds(timeout));
-    }
-
-    @Test
-    public void fluxRetryNoBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 0L, 0L);
-    }
-
-    @Test
-    public void monoRetryNoBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
-        StepVerifier.create(mono)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 0L, 0L);
-    }
-
-    @Test
-    public void fluxRetryFixedBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> flux)
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(300))
-            .thenAwait(Duration.ofMillis(300))
-            .expectNext(0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class);
-        RetryTestUtils.assertDelays(retries, 500L);
-    }
-
-    @Test
-    public void monoRetryFixedBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .expectNoEvent(Duration.ofMillis(300))
-            .thenAwait(Duration.ofMillis(300))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class);
-        RetryTestUtils.assertDelays(retries, 500L);
-    }
-
-
-    @Test
-    public void fluxRetryExponentialBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any()
-                .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
-                .timeout(Duration.ofMillis(1500))
-                .doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(50))  // delay=100
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(150)) // delay=200
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(250)) // delay=400
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(450)) // delay=500
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
-    }
-    @Test
-    public void monoRetryExponentialBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any()
-                .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .thenAwait(Duration.ofMillis(100))
-            .thenAwait(Duration.ofMillis(200))
-            .thenAwait(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(500))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
-    }
-
-    @Test
-    public void fluxRetryRandomBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any()
-                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1, 0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
-    }
-
-    @Test
-    public void monoRetryRandomBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any()
-                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .thenAwait(Duration.ofMillis(100))
-            .thenAwait(Duration.ofMillis(2000))
-            .thenAwait(Duration.ofMillis(2000))
-            .thenAwait(Duration.ofMillis(2000))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
-    }
-
-
-    @Test
-    public void fluxRetriableExceptions() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
-            .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        Flux<Integer> nonRetriable = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException()))
-            .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
-        StepVerifier.create(nonRetriable)
-            .expectNext(0, 1)
-            .verifyError(RuntimeException.class);
-
-    }
-
-    @Test
-    public void fluxNonRetriableExceptions() {
-
-        Retry<?> retry = RetryWithAsyncCallback.allBut(RuntimeException.class).retryOnce().doOnRetry(onRetry());
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IllegalStateException())).retryWhen(retry);
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1)
-            .verifyError(IllegalStateException.class);
-
-
-        Flux<Integer> retriable = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
-        StepVerifier.create(retriable)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void fluxRetryAnyException() {
-        Retry<?> retry = RetryWithAsyncCallback.any().retryOnce().doOnRetry(onRetry());
-
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        Flux<Integer> flux2 = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException())).retryWhen(retry);
-        StepVerifier.create(flux2)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
-    }
-
-    @Test
-    public void fluxRetryOnPredicate() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
-            .retryWhen(RetryWithAsyncCallback.onlyIf(context -> context.iteration() < 3).doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(SocketException.class);
-    }
-
-
-    @Test
-    public void doOnRetry() {
-        Semaphore semaphore = new Semaphore(0);
-        Retry<?> retry = RetryWithAsyncCallback.any()
-            .retryOnce()
-            .fixedBackoff(Duration.ofMillis(500))
-            .doOnRetry(context -> semaphore.release());
-
-        StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
-            .expectNext(0, 1)
-            .then(semaphore::acquireUninterruptibly)
-            .expectNoEvent(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(200))
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
-            .then(semaphore::acquireUninterruptibly)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void onRetryWithMono() {
-        Semaphore semaphore = new Semaphore(0);
-        Retry<?> retry = RetryWithAsyncCallback.any()
-            .retryOnce()
-            .fixedBackoff(Duration.ofMillis(500))
-            .onRetryWithMono(context -> Mono.fromCallable(() -> { semaphore.release(); return 0; }));
-
-        StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
-            .expectNext(0, 1)
-            .then(semaphore::acquireUninterruptibly)
-            .expectNoEvent(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(200))
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
-            .then(semaphore::acquireUninterruptibly)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void retryApplicationContext() {
-        class AppContext {
-            boolean needsRollback;
-            void rollback() {
-                needsRollback = false;
-            }
-            void run() {
-                assertFalse("Rollback not performed", needsRollback);
-                needsRollback = true;
-            }
-        }
-        AppContext appContext = new AppContext();
-        Retry<?> retry = RetryWithAsyncCallback.<AppContext>any().withApplicationContext(appContext)
-            .retryMax(2)
-            .doOnRetry(context -> {
-                AppContext ac = context.applicationContext();
-                assertNotNull("Application context not propagated", ac);
-                ac.rollback();
-            });
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new RuntimeException()).doOnNext(i -> appContext.run()).retryWhen(retry))
-            .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
-    }
-
-    @Test
-    public void fluxRetryCompose() {
-        Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())).as(retry::apply);
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-    }
-
-    @Test
-    public void monoRetryCompose() {
-        Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
-        Flux<?> flux = Mono.error(new IOException()).as(retry::apply);
-
-        StepVerifier.create(flux)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-    }
-
-    @Test
-    public void functionReuseInParallel() throws Exception {
-        int retryCount = 19;
-        int range = 100;
-        Integer[] values = new Integer[(retryCount + 1) * range];
-        for (int i = 0; i <= retryCount; i++) {
-            for (int j = 1; j <= range; j++)
-                values[i * range + j - 1] = j;
-        }
-        RetryTestUtils.testReuseInParallel(2, 20,
-            backoff -> RetryWithAsyncCallback.<Integer>any().retryMax(19).backoff(backoff),
-            retryFunc -> StepVerifier.create(Flux.range(1, range).concatWith(Mono.error(new SocketException())).retryWhen(retryFunc))
-                .expectNext(values)
-                .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)));
-    }
-
-    Consumer<? super RetryContext<?>> onRetry() {
-        return context -> retries.add(context);
-    }
-
-    @SafeVarargs
-    private final void assertRetries(Class<? extends Throwable>... exceptions) {
-        assertEquals(exceptions.length, retries.size());
-        int index = 0;
-        for (Iterator<RetryContext<?>> it = retries.iterator(); it.hasNext(); ) {
-            RetryContext<?> retryContext = it.next();
-            assertEquals(index + 1, retryContext.iteration());
-            assertEquals(exceptions[index], retryContext.exception().getClass());
-            index++;
-        }
-    }
-
-    static boolean isRetryExhausted(Throwable e, Class<? extends Throwable> cause) {
-        return e instanceof RetryExhaustedException && cause.isInstance(e.getCause());
-    }
-
-    @Test
-    public void retryToString() {
-        System.out.println(RetryWithAsyncCallback.any().noBackoff().retryMax(2).toString());
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org