You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/04/27 05:13:23 UTC

[httpcomponents-client] branch master updated (656d0dd -> e10ba08)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git.


    from 656d0dd  HTTPCLIENT-2148: fluent Executor volatile access thread safety (#301)
     new 00ca810  Async clients to support scheduled (delayed) re-execution of requests
     new e10ba08  HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client5/http/impl/cache/AsyncCachingExec.java  |   4 +-
 ....java => ServiceUnavailableAsyncDecorator.java} |  86 ++++------
 .../testing/async/TestHttp1RequestReExecution.java | 187 +++++++++++++++++++++
 .../hc/client5/http/async/AsyncExecChain.java      |  36 +++-
 .../http/impl/async/AsyncHttpRequestRetryExec.java |  28 ++-
 .../client5/http/impl/async/AsyncRedirectExec.java |  11 +-
 .../async/InternalAbstractHttpAsyncClient.java     |  93 +++++++++-
 7 files changed, 370 insertions(+), 75 deletions(-)
 copy httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/{RedirectingAsyncDecorator.java => ServiceUnavailableAsyncDecorator.java} (62%)
 create mode 100644 httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java

[httpcomponents-client] 02/02: HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit e10ba0873b1a123a8f3dae96389b14b81428d0ed
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Apr 25 15:08:52 2021 +0200

    HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor
---
 .../async/ServiceUnavailableAsyncDecorator.java    | 138 +++++++++++++++
 .../testing/async/TestHttp1RequestReExecution.java | 187 +++++++++++++++++++++
 .../http/impl/async/AsyncHttpRequestRetryExec.java |  28 ++-
 3 files changed, 337 insertions(+), 16 deletions(-)

diff --git a/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
new file mode 100644
index 0000000..16d883e
--- /dev/null
+++ b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
@@ -0,0 +1,138 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ServiceUnavailableAsyncDecorator implements AsyncServerExchangeHandler {
+
+    private final AsyncServerExchangeHandler exchangeHandler;
+    private final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver;
+    private final AtomicBoolean serviceUnavailable;
+
+    public ServiceUnavailableAsyncDecorator(final AsyncServerExchangeHandler exchangeHandler,
+                                            final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver) {
+        this.exchangeHandler = Args.notNull(exchangeHandler, "Exchange handler");
+        this.serviceAvailabilityResolver = Args.notNull(serviceAvailabilityResolver, "Service availability resolver");
+        this.serviceUnavailable = new AtomicBoolean();
+    }
+
+    @Override
+    public void handleRequest(final HttpRequest request,
+                              final EntityDetails entityDetails,
+                              final ResponseChannel responseChannel,
+                              final HttpContext context) throws HttpException, IOException {
+        final TimeValue retryAfter = serviceAvailabilityResolver.resolve(request);
+        serviceUnavailable.set(TimeValue.isPositive(retryAfter));
+        if (serviceUnavailable.get()) {
+            final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
+            response.addHeader(HttpHeaders.RETRY_AFTER, Long.toString(retryAfter.toSeconds()));
+            final ProtocolVersion version = request.getVersion();
+            if (version != null && version.compareToVersion(HttpVersion.HTTP_2) < 0) {
+                response.addHeader(HttpHeaders.CONNECTION, "Close");
+            }
+            responseChannel.sendResponse(response, null, context);
+        } else {
+            exchangeHandler.handleRequest(request, entityDetails, responseChannel, context);
+        }
+    }
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.updateCapacity(capacityChannel);
+        } else {
+            capacityChannel.update(Integer.MAX_VALUE);
+        }
+    }
+
+    @Override
+    public final void consume(final ByteBuffer src) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.consume(src);
+        }
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.streamEnd(trailers);
+        }
+    }
+
+    @Override
+    public int available() {
+        if (!serviceUnavailable.get()) {
+            return exchangeHandler.available();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.produce(channel);
+        }
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.failed(cause);
+        }
+    }
+
+    @Override
+    public void releaseResources() {
+        exchangeHandler.releaseResources();
+    }
+
+}
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
new file mode 100644
index 0000000..88aeb84
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
@@ -0,0 +1,187 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.core5.function.Decorator;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.util.TimeValue;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class TestHttp1RequestReExecution extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> protocolVersions() {
+        return Arrays.asList(new Object[][]{
+                { HttpVersion.HTTP_1_1 },
+                { HttpVersion.HTTP_2 }
+        });
+    }
+
+    private final HttpVersion version;
+
+    public TestHttp1RequestReExecution(final HttpVersion version) {
+        super(URIScheme.HTTP);
+        this.version = version;
+    }
+
+    HttpAsyncClientBuilder clientBuilder;
+    PoolingAsyncClientConnectionManager connManager;
+
+    @Rule
+    public ExternalResource connManagerResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            connManager = PoolingAsyncClientConnectionManagerBuilder.create()
+                    .build();
+        }
+
+        @Override
+        protected void after() {
+            if (connManager != null) {
+                connManager.close();
+                connManager = null;
+            }
+        }
+
+    };
+
+    @Rule
+    public ExternalResource clientBuilderResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            clientBuilder = HttpAsyncClientBuilder.create()
+                    .setDefaultRequestConfig(RequestConfig.custom()
+                            .setConnectionRequestTimeout(TIMEOUT)
+                            .setConnectTimeout(TIMEOUT)
+                            .build())
+                    .setConnectionManager(connManager)
+                    .setVersionPolicy(version.greaterEquals(HttpVersion.HTTP_2) ? HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1);
+        }
+
+    };
+
+    @Override
+    public final HttpHost start() throws Exception {
+
+        final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver = new Resolver<HttpRequest, TimeValue>() {
+
+            private final AtomicInteger count = new AtomicInteger(0);
+
+            @Override
+            public TimeValue resolve(final HttpRequest request) {
+                final int n = count.incrementAndGet();
+                return n <= 3 ? TimeValue.ofSeconds(1) : null;
+            }
+
+        };
+
+        if (version.greaterEquals(HttpVersion.HTTP_2)) {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
+                    return new ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver);
+                }
+
+            }, H2Config.DEFAULT);
+        } else {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
+                    return new ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver);
+                }
+
+            }, Http1Config.DEFAULT);
+        }
+    }
+
+    @Override
+    protected CloseableHttpAsyncClient createClient() throws Exception {
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testGiveUpAfterOneRetry() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(1, TimeValue.ofSeconds(1)));
+        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_SERVICE_UNAVAILABLE));
+    }
+
+    @Test
+    public void testDoNotGiveUpEasily() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(5, TimeValue.ofSeconds(1)));
+        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+    }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
index 8111828..186a331 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
@@ -46,6 +46,7 @@ import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
 import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +79,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
 
     private static class State {
 
-        volatile int execCount;
         volatile boolean retrying;
+        volatile TimeValue delay;
 
     }
 
@@ -106,8 +107,12 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                     }
                     return asyncExecCallback.handleResponse(response, entityDetails);
                 }
-                state.retrying = retryStrategy.retryRequest(response, state.execCount, clientContext);
+                state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
                 if (state.retrying) {
+                    state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} retrying request in {}", exchangeId, state.delay);
+                    }
                     return new NoopEntityConsumer();
                 } else {
                     return asyncExecCallback.handleResponse(response, entityDetails);
@@ -122,12 +127,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
             @Override
             public void completed() {
                 if (state.retrying) {
-                    state.execCount++;
-                    try {
-                        internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
-                    } catch (final IOException | HttpException ex) {
-                        asyncExecCallback.failed(ex);
-                    }
+                    scope.execCount.incrementAndGet();
+                    scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
                 } else {
                     asyncExecCallback.completed();
                 }
@@ -142,7 +143,7 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("{} cannot retry non-repeatable request", exchangeId);
                         }
-                    } else if (retryStrategy.retryRequest(request, (IOException) cause, state.execCount, clientContext)) {
+                    } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
                         }
@@ -155,12 +156,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                             entityProducer.releaseResources();
                         }
                         state.retrying = true;
-                        state.execCount++;
-                        try {
-                            internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
-                        } catch (final IOException | HttpException ex) {
-                            asyncExecCallback.failed(ex);
-                        }
+                        scope.execCount.incrementAndGet();
+                        scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
                         return;
                     }
                 }
@@ -179,7 +176,6 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
             final AsyncExecChain chain,
             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
         final State state = new State();
-        state.execCount = 1;
         state.retrying = false;
         internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
     }

[httpcomponents-client] 01/02: Async clients to support scheduled (delayed) re-execution of requests

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit 00ca8105f49aa92fcea75f6c91e5395e9bf2c7ff
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Apr 25 15:04:36 2021 +0200

    Async clients to support scheduled (delayed) re-execution of requests
---
 .../client5/http/impl/cache/AsyncCachingExec.java  |  4 +-
 .../hc/client5/http/async/AsyncExecChain.java      | 36 ++++++++-
 .../client5/http/impl/async/AsyncRedirectExec.java | 11 ++-
 .../async/InternalAbstractHttpAsyncClient.java     | 93 +++++++++++++++++++++-
 4 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
index d12215a..93419a7 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
@@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
                             scope.originalRequest,
                             new ComplexFuture<>(null),
                             HttpClientContext.create(),
-                            scope.execRuntime.fork());
+                            scope.execRuntime.fork(),
+                            scope.scheduler,
+                            scope.execCount);
                     cacheRevalidator.revalidateCacheEntry(
                             responseCache.generateKey(target, request, entry),
                             asyncExecCallback,
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
index b2cff0b..fb7d3d2 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
@@ -27,6 +27,7 @@
 package org.apache.hc.client5.http.async;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 
 /**
  * Represents a single element in the client side asynchronous request execution chain.
@@ -59,6 +61,8 @@ public interface AsyncExecChain {
         public final CancellableDependency cancellableDependency;
         public final HttpClientContext clientContext;
         public final AsyncExecRuntime execRuntime;
+        public final Scheduler scheduler;
+        public final AtomicInteger execCount;
 
         public Scope(
                 final String exchangeId,
@@ -66,18 +70,48 @@ public interface AsyncExecChain {
                 final HttpRequest originalRequest,
                 final CancellableDependency cancellableDependency,
                 final HttpClientContext clientContext,
-                final AsyncExecRuntime execRuntime) {
+                final AsyncExecRuntime execRuntime,
+                final Scheduler scheduler,
+                final AtomicInteger execCount) {
             this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
             this.route = Args.notNull(route, "Route");
             this.originalRequest = Args.notNull(originalRequest, "Original request");
             this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency");
             this.clientContext = clientContext != null ? clientContext : HttpClientContext.create();
             this.execRuntime = Args.notNull(execRuntime, "Exec runtime");
+            this.scheduler = Args.notNull(scheduler, "Exec scheduler");
+            this.execCount = execCount != null ? execCount : new AtomicInteger(1);
         }
 
     }
 
     /**
+     * Request execution scheduler
+     *
+     * @since 5.1
+     */
+    interface Scheduler {
+
+        /**
+         * Schedules request re-execution immediately or after a delay.
+         * @param request the actual request.
+         * @param entityProducer the request entity producer or {@code null} if the request
+         *                      does not enclose an entity.
+         * @param scope the execution scope .
+         * @param asyncExecCallback the execution callback.
+         * @param delay re-execution delay. Can be {@code null} if the request is to be
+         *              re-executed immediately.
+         */
+        void scheduleExecution(
+                HttpRequest request,
+                AsyncEntityProducer entityProducer,
+                AsyncExecChain.Scope scope,
+                AsyncExecCallback asyncExecCallback,
+                TimeValue delay);
+
+    }
+
+    /**
      * Proceeds to the next element in the request execution chain.
      *
      * @param request the actual request.
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
index b5c7eaf..d04944b 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
@@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements AsyncExecChainHandler {
                                     proxyAuthExchange.reset();
                                 }
                             }
-                            state.currentScope = new AsyncExecChain.Scope(scope.exchangeId, newRoute,
-                                    scope.originalRequest, scope.cancellableDependency, clientContext, scope.execRuntime);
+                            state.currentScope = new AsyncExecChain.Scope(
+                                    scope.exchangeId,
+                                    newRoute,
+                                    scope.originalRequest,
+                                    scope.cancellableDependency,
+                                    scope.clientContext,
+                                    scope.execRuntime,
+                                    scope.scheduler,
+                                    scope.execCount);
                         }
                     }
                 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
index 0b08573..6dac275 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -32,9 +32,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.async.AsyncExecCallback;
@@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore;
 import org.apache.hc.client5.http.impl.ExecSupport;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.routing.RoutingSupport;
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
@@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.ModalCloseable;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
 
+    private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor");
+
     private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
     private final AsyncExecChainElement execChain;
     private final Lookup<CookieSpecFactory> cookieSpecRegistry;
@@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
     private final CredentialsProvider credentialsProvider;
     private final RequestConfig defaultConfig;
     private final ConcurrentLinkedQueue<Closeable> closeables;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     InternalAbstractHttpAsyncClient(
             final DefaultConnectingIOReactor ioReactor,
@@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
         this.credentialsProvider = credentialsProvider;
         this.defaultConfig = defaultConfig;
         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
     }
 
     @Override
@@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
                 }
             }
         }
+        final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
+        for (final Runnable runnable: runnables) {
+            if (runnable instanceof Cancellable) {
+                ((Cancellable) runnable).cancel();
+            }
+        }
     }
 
     private void setupContext(final HttpClientContext context) {
@@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
                     clientContext.setExchangeId(exchangeId);
                     setupContext(clientContext);
 
+                    final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
+
+                        @Override
+                        public void scheduleExecution(final HttpRequest request,
+                                                      final AsyncEntityProducer entityProducer,
+                                                      final AsyncExecChain.Scope scope,
+                                                      final AsyncExecCallback asyncExecCallback,
+                                                      final TimeValue delay) {
+                            executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
+                        }
+
+                    };
+
                     final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
-                            clientContext, execRuntime);
+                            clientContext, execRuntime, scheduler, new AtomicInteger(1));
                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
-                    execChain.execute(
+                    executeImmediate(
                             BasicRequestBuilder.copy(request).build(),
                             entityDetails != null ? new AsyncEntityProducer() {
 
@@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
         return future;
     }
 
+    void executeImmediate(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+        execChain.execute(request, entityProducer, scope, asyncExecCallback);
+    }
+
+    void executeScheduled(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback,
+            final TimeValue delay) {
+        final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
+                request, entityProducer, scope, asyncExecCallback, delay);
+        if (TimeValue.isPositive(delay)) {
+            scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
+        } else {
+            scheduledExecutorService.execute(scheduledTask);
+        }
+    }
+
+    class ScheduledRequestExecution implements Runnable, Cancellable {
+
+        final HttpRequest request;
+        final AsyncEntityProducer entityProducer;
+        final AsyncExecChain.Scope scope;
+        final AsyncExecCallback asyncExecCallback;
+        final TimeValue delay;
+
+        ScheduledRequestExecution(final HttpRequest request,
+                                  final AsyncEntityProducer entityProducer,
+                                  final AsyncExecChain.Scope scope,
+                                  final AsyncExecCallback asyncExecCallback,
+                                  final TimeValue delay) {
+            this.request = request;
+            this.entityProducer = entityProducer;
+            this.scope = scope;
+            this.asyncExecCallback = asyncExecCallback;
+            this.delay = delay;
+        }
+
+        @Override
+        public void run() {
+            try {
+                execChain.execute(request, entityProducer, scope, asyncExecCallback);
+            } catch (final Exception ex) {
+                asyncExecCallback.failed(ex);
+            }
+        }
+
+        @Override
+        public boolean cancel() {
+            asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
+            return true;
+        }
+
+    }
+
 }