You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2021/04/27 11:08:41 UTC

[httpcomponents-client] branch master updated (e10ba08 -> 09f50cd)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git.


 discard e10ba08  HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor
 discard 00ca810  Async clients to support scheduled (delayed) re-execution of requests
     new 13137eb  Async clients to support scheduled (delayed) re-execution of requests
     new 09f50cd  HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e10ba08)
            \
             N -- N -- N   refs/heads/master (09f50cd)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hc/client5/http/async/AsyncExecChain.java       | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

[httpcomponents-client] 02/02: HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit 09f50cd80c9cabc9f7532b6063fe67695f5484fa
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Apr 25 15:08:52 2021 +0200

    HTTPCLIENT-2152: Fixed handling of unexpected unchecked exception by the async request retry exec interceptor
---
 .../async/ServiceUnavailableAsyncDecorator.java    | 138 +++++++++++++++
 .../testing/async/TestHttp1RequestReExecution.java | 187 +++++++++++++++++++++
 .../http/impl/async/AsyncHttpRequestRetryExec.java |  28 ++-
 3 files changed, 337 insertions(+), 16 deletions(-)

diff --git a/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
new file mode 100644
index 0000000..16d883e
--- /dev/null
+++ b/httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/ServiceUnavailableAsyncDecorator.java
@@ -0,0 +1,138 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ServiceUnavailableAsyncDecorator implements AsyncServerExchangeHandler {
+
+    private final AsyncServerExchangeHandler exchangeHandler;
+    private final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver;
+    private final AtomicBoolean serviceUnavailable;
+
+    public ServiceUnavailableAsyncDecorator(final AsyncServerExchangeHandler exchangeHandler,
+                                            final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver) {
+        this.exchangeHandler = Args.notNull(exchangeHandler, "Exchange handler");
+        this.serviceAvailabilityResolver = Args.notNull(serviceAvailabilityResolver, "Service availability resolver");
+        this.serviceUnavailable = new AtomicBoolean();
+    }
+
+    @Override
+    public void handleRequest(final HttpRequest request,
+                              final EntityDetails entityDetails,
+                              final ResponseChannel responseChannel,
+                              final HttpContext context) throws HttpException, IOException {
+        final TimeValue retryAfter = serviceAvailabilityResolver.resolve(request);
+        serviceUnavailable.set(TimeValue.isPositive(retryAfter));
+        if (serviceUnavailable.get()) {
+            final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
+            response.addHeader(HttpHeaders.RETRY_AFTER, Long.toString(retryAfter.toSeconds()));
+            final ProtocolVersion version = request.getVersion();
+            if (version != null && version.compareToVersion(HttpVersion.HTTP_2) < 0) {
+                response.addHeader(HttpHeaders.CONNECTION, "Close");
+            }
+            responseChannel.sendResponse(response, null, context);
+        } else {
+            exchangeHandler.handleRequest(request, entityDetails, responseChannel, context);
+        }
+    }
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.updateCapacity(capacityChannel);
+        } else {
+            capacityChannel.update(Integer.MAX_VALUE);
+        }
+    }
+
+    @Override
+    public final void consume(final ByteBuffer src) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.consume(src);
+        }
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.streamEnd(trailers);
+        }
+    }
+
+    @Override
+    public int available() {
+        if (!serviceUnavailable.get()) {
+            return exchangeHandler.available();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.produce(channel);
+        }
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        if (!serviceUnavailable.get()) {
+            exchangeHandler.failed(cause);
+        }
+    }
+
+    @Override
+    public void releaseResources() {
+        exchangeHandler.releaseResources();
+    }
+
+}
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
new file mode 100644
index 0000000..88aeb84
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp1RequestReExecution.java
@@ -0,0 +1,187 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.core5.function.Decorator;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.config.Http1Config;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.util.TimeValue;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class TestHttp1RequestReExecution extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> protocolVersions() {
+        return Arrays.asList(new Object[][]{
+                { HttpVersion.HTTP_1_1 },
+                { HttpVersion.HTTP_2 }
+        });
+    }
+
+    private final HttpVersion version;
+
+    public TestHttp1RequestReExecution(final HttpVersion version) {
+        super(URIScheme.HTTP);
+        this.version = version;
+    }
+
+    HttpAsyncClientBuilder clientBuilder;
+    PoolingAsyncClientConnectionManager connManager;
+
+    @Rule
+    public ExternalResource connManagerResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            connManager = PoolingAsyncClientConnectionManagerBuilder.create()
+                    .build();
+        }
+
+        @Override
+        protected void after() {
+            if (connManager != null) {
+                connManager.close();
+                connManager = null;
+            }
+        }
+
+    };
+
+    @Rule
+    public ExternalResource clientBuilderResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            clientBuilder = HttpAsyncClientBuilder.create()
+                    .setDefaultRequestConfig(RequestConfig.custom()
+                            .setConnectionRequestTimeout(TIMEOUT)
+                            .setConnectTimeout(TIMEOUT)
+                            .build())
+                    .setConnectionManager(connManager)
+                    .setVersionPolicy(version.greaterEquals(HttpVersion.HTTP_2) ? HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1);
+        }
+
+    };
+
+    @Override
+    public final HttpHost start() throws Exception {
+
+        final Resolver<HttpRequest, TimeValue> serviceAvailabilityResolver = new Resolver<HttpRequest, TimeValue>() {
+
+            private final AtomicInteger count = new AtomicInteger(0);
+
+            @Override
+            public TimeValue resolve(final HttpRequest request) {
+                final int n = count.incrementAndGet();
+                return n <= 3 ? TimeValue.ofSeconds(1) : null;
+            }
+
+        };
+
+        if (version.greaterEquals(HttpVersion.HTTP_2)) {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
+                    return new ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver);
+                }
+
+            }, H2Config.DEFAULT);
+        } else {
+            return super.start(null, new Decorator<AsyncServerExchangeHandler>() {
+
+                @Override
+                public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
+                    return new ServiceUnavailableAsyncDecorator(handler, serviceAvailabilityResolver);
+                }
+
+            }, Http1Config.DEFAULT);
+        }
+    }
+
+    @Override
+    protected CloseableHttpAsyncClient createClient() throws Exception {
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testGiveUpAfterOneRetry() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(1, TimeValue.ofSeconds(1)));
+        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_SERVICE_UNAVAILABLE));
+    }
+
+    @Test
+    public void testDoNotGiveUpEasily() throws Exception {
+        clientBuilder.setRetryStrategy(new DefaultHttpRequestRetryStrategy(5, TimeValue.ofSeconds(1)));
+        final HttpHost target = start();
+        final Future<SimpleHttpResponse> future = httpclient.execute(
+                SimpleRequestBuilder.get()
+                        .setHttpHost(target)
+                        .setPath("/random/2048")
+                        .build(), null);
+        final SimpleHttpResponse response = future.get();
+        Assert.assertThat(response, CoreMatchers.notNullValue());
+        Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+    }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
index 8111828..186a331 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java
@@ -46,6 +46,7 @@ import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
 import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +79,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
 
     private static class State {
 
-        volatile int execCount;
         volatile boolean retrying;
+        volatile TimeValue delay;
 
     }
 
@@ -106,8 +107,12 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                     }
                     return asyncExecCallback.handleResponse(response, entityDetails);
                 }
-                state.retrying = retryStrategy.retryRequest(response, state.execCount, clientContext);
+                state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
                 if (state.retrying) {
+                    state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} retrying request in {}", exchangeId, state.delay);
+                    }
                     return new NoopEntityConsumer();
                 } else {
                     return asyncExecCallback.handleResponse(response, entityDetails);
@@ -122,12 +127,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
             @Override
             public void completed() {
                 if (state.retrying) {
-                    state.execCount++;
-                    try {
-                        internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
-                    } catch (final IOException | HttpException ex) {
-                        asyncExecCallback.failed(ex);
-                    }
+                    scope.execCount.incrementAndGet();
+                    scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
                 } else {
                     asyncExecCallback.completed();
                 }
@@ -142,7 +143,7 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("{} cannot retry non-repeatable request", exchangeId);
                         }
-                    } else if (retryStrategy.retryRequest(request, (IOException) cause, state.execCount, clientContext)) {
+                    } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
                         }
@@ -155,12 +156,8 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
                             entityProducer.releaseResources();
                         }
                         state.retrying = true;
-                        state.execCount++;
-                        try {
-                            internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
-                        } catch (final IOException | HttpException ex) {
-                            asyncExecCallback.failed(ex);
-                        }
+                        scope.execCount.incrementAndGet();
+                        scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
                         return;
                     }
                 }
@@ -179,7 +176,6 @@ public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
             final AsyncExecChain chain,
             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
         final State state = new State();
-        state.execCount = 1;
         state.retrying = false;
         internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
     }

[httpcomponents-client] 01/02: Async clients to support scheduled (delayed) re-execution of requests

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit 13137eb6c7071d9753121bd3013a8ba273abbd48
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sun Apr 25 15:04:36 2021 +0200

    Async clients to support scheduled (delayed) re-execution of requests
---
 .../client5/http/impl/cache/AsyncCachingExec.java  |  4 +-
 .../hc/client5/http/async/AsyncExecChain.java      | 55 ++++++++++++-
 .../client5/http/impl/async/AsyncRedirectExec.java | 11 ++-
 .../async/InternalAbstractHttpAsyncClient.java     | 93 +++++++++++++++++++++-
 4 files changed, 157 insertions(+), 6 deletions(-)

diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
index d12215a..93419a7 100644
--- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
+++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java
@@ -669,7 +669,9 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
                             scope.originalRequest,
                             new ComplexFuture<>(null),
                             HttpClientContext.create(),
-                            scope.execRuntime.fork());
+                            scope.execRuntime.fork(),
+                            scope.scheduler,
+                            scope.execCount);
                     cacheRevalidator.revalidateCacheEntry(
                             responseCache.generateKey(target, request, entry),
                             asyncExecCallback,
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
index b2cff0b..0819460 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java
@@ -27,6 +27,7 @@
 package org.apache.hc.client5.http.async;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -37,6 +38,7 @@ import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 
 /**
  * Represents a single element in the client side asynchronous request execution chain.
@@ -59,22 +61,73 @@ public interface AsyncExecChain {
         public final CancellableDependency cancellableDependency;
         public final HttpClientContext clientContext;
         public final AsyncExecRuntime execRuntime;
+        public final Scheduler scheduler;
+        public final AtomicInteger execCount;
 
+        /**
+         * @since 5.1
+         */
         public Scope(
                 final String exchangeId,
                 final HttpRoute route,
                 final HttpRequest originalRequest,
                 final CancellableDependency cancellableDependency,
                 final HttpClientContext clientContext,
-                final AsyncExecRuntime execRuntime) {
+                final AsyncExecRuntime execRuntime,
+                final Scheduler scheduler,
+                final AtomicInteger execCount) {
             this.exchangeId = Args.notBlank(exchangeId, "Exchange id");
             this.route = Args.notNull(route, "Route");
             this.originalRequest = Args.notNull(originalRequest, "Original request");
             this.cancellableDependency = Args.notNull(cancellableDependency, "Dependency");
             this.clientContext = clientContext != null ? clientContext : HttpClientContext.create();
             this.execRuntime = Args.notNull(execRuntime, "Exec runtime");
+            this.scheduler = scheduler;
+            this.execCount = execCount != null ? execCount : new AtomicInteger(1);
         }
 
+        /**
+         * @deprecated Use {@link Scope#Scope(String, HttpRoute, HttpRequest, CancellableDependency, HttpClientContext,
+         * AsyncExecRuntime, Scheduler, AtomicInteger)}
+         */
+        @Deprecated
+        public Scope(
+                final String exchangeId,
+                final HttpRoute route,
+                final HttpRequest originalRequest,
+                final CancellableDependency cancellableDependency,
+                final HttpClientContext clientContext,
+                final AsyncExecRuntime execRuntime) {
+            this(exchangeId, route, originalRequest, cancellableDependency, clientContext, execRuntime,
+                    null, new AtomicInteger(1));
+        }
+
+    }
+
+    /**
+     * Request execution scheduler
+     *
+     * @since 5.1
+     */
+    interface Scheduler {
+
+        /**
+         * Schedules request re-execution immediately or after a delay.
+         * @param request the actual request.
+         * @param entityProducer the request entity producer or {@code null} if the request
+         *                      does not enclose an entity.
+         * @param scope the execution scope .
+         * @param asyncExecCallback the execution callback.
+         * @param delay re-execution delay. Can be {@code null} if the request is to be
+         *              re-executed immediately.
+         */
+        void scheduleExecution(
+                HttpRequest request,
+                AsyncEntityProducer entityProducer,
+                AsyncExecChain.Scope scope,
+                AsyncExecCallback asyncExecCallback,
+                TimeValue delay);
+
     }
 
     /**
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
index b5c7eaf..d04944b 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRedirectExec.java
@@ -187,8 +187,15 @@ public final class AsyncRedirectExec implements AsyncExecChainHandler {
                                     proxyAuthExchange.reset();
                                 }
                             }
-                            state.currentScope = new AsyncExecChain.Scope(scope.exchangeId, newRoute,
-                                    scope.originalRequest, scope.cancellableDependency, clientContext, scope.execRuntime);
+                            state.currentScope = new AsyncExecChain.Scope(
+                                    scope.exchangeId,
+                                    newRoute,
+                                    scope.originalRequest,
+                                    scope.cancellableDependency,
+                                    scope.clientContext,
+                                    scope.execRuntime,
+                                    scope.scheduler,
+                                    scope.execCount);
                         }
                     }
                 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
index 0b08573..6dac275 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java
@@ -32,9 +32,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hc.client5.http.HttpRoute;
 import org.apache.hc.client5.http.async.AsyncExecCallback;
@@ -49,7 +52,9 @@ import org.apache.hc.client5.http.cookie.CookieStore;
 import org.apache.hc.client5.http.impl.ExecSupport;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.routing.RoutingSupport;
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.HttpException;
@@ -71,11 +76,14 @@ import org.apache.hc.core5.http.support.BasicRequestBuilder;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.ModalCloseable;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
+import org.apache.hc.core5.util.TimeValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
 
+    private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor");
+
     private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
     private final AsyncExecChainElement execChain;
     private final Lookup<CookieSpecFactory> cookieSpecRegistry;
@@ -84,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
     private final CredentialsProvider credentialsProvider;
     private final RequestConfig defaultConfig;
     private final ConcurrentLinkedQueue<Closeable> closeables;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     InternalAbstractHttpAsyncClient(
             final DefaultConnectingIOReactor ioReactor,
@@ -104,6 +113,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
         this.credentialsProvider = credentialsProvider;
         this.defaultConfig = defaultConfig;
         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
+        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
     }
 
     @Override
@@ -122,6 +132,12 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
                 }
             }
         }
+        final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
+        for (final Runnable runnable: runnables) {
+            if (runnable instanceof Cancellable) {
+                ((Cancellable) runnable).cancel();
+            }
+        }
     }
 
     private void setupContext(final HttpClientContext context) {
@@ -187,10 +203,23 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
                     clientContext.setExchangeId(exchangeId);
                     setupContext(clientContext);
 
+                    final AsyncExecChain.Scheduler scheduler = new AsyncExecChain.Scheduler() {
+
+                        @Override
+                        public void scheduleExecution(final HttpRequest request,
+                                                      final AsyncEntityProducer entityProducer,
+                                                      final AsyncExecChain.Scope scope,
+                                                      final AsyncExecCallback asyncExecCallback,
+                                                      final TimeValue delay) {
+                            executeScheduled(request, entityProducer, scope, asyncExecCallback, delay);
+                        }
+
+                    };
+
                     final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
-                            clientContext, execRuntime);
+                            clientContext, execRuntime, scheduler, new AtomicInteger(1));
                     final AtomicBoolean outputTerminated = new AtomicBoolean(false);
-                    execChain.execute(
+                    executeImmediate(
                             BasicRequestBuilder.copy(request).build(),
                             entityDetails != null ? new AsyncEntityProducer() {
 
@@ -329,4 +358,64 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
         return future;
     }
 
+    void executeImmediate(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+        execChain.execute(request, entityProducer, scope, asyncExecCallback);
+    }
+
+    void executeScheduled(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecCallback asyncExecCallback,
+            final TimeValue delay) {
+        final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
+                request, entityProducer, scope, asyncExecCallback, delay);
+        if (TimeValue.isPositive(delay)) {
+            scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
+        } else {
+            scheduledExecutorService.execute(scheduledTask);
+        }
+    }
+
+    class ScheduledRequestExecution implements Runnable, Cancellable {
+
+        final HttpRequest request;
+        final AsyncEntityProducer entityProducer;
+        final AsyncExecChain.Scope scope;
+        final AsyncExecCallback asyncExecCallback;
+        final TimeValue delay;
+
+        ScheduledRequestExecution(final HttpRequest request,
+                                  final AsyncEntityProducer entityProducer,
+                                  final AsyncExecChain.Scope scope,
+                                  final AsyncExecCallback asyncExecCallback,
+                                  final TimeValue delay) {
+            this.request = request;
+            this.entityProducer = entityProducer;
+            this.scope = scope;
+            this.asyncExecCallback = asyncExecCallback;
+            this.delay = delay;
+        }
+
+        @Override
+        public void run() {
+            try {
+                execChain.execute(request, entityProducer, scope, asyncExecCallback);
+            } catch (final Exception ex) {
+                asyncExecCallback.failed(ex);
+            }
+        }
+
+        @Override
+        public boolean cancel() {
+            asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
+            return true;
+        }
+
+    }
+
 }