You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/09/12 12:39:06 UTC

[httpcomponents-core] 13/18: HTTPCORE-639: Add a configurable ResponseOutOfOrder strategy for DefaultBHttpClientConnection

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit bf7759d3794a35979a3a5d07e823cde771b19790
Author: Carter Kozak <c4...@gmail.com>
AuthorDate: Mon Jul 27 10:17:45 2020 -0400

    HTTPCORE-639: Add a configurable ResponseOutOfOrder strategy for DefaultBHttpClientConnection
    
    This adds a configurable ResponseOutOfOrderStrategy in place of the
    previous always-enabled behavior, and uses the no-op
    NoResponseOutOfOrderStrategy implementation by default.
    
    The previous behavior can be used by selecting the
    MonitoringResponseOutOfOrderStrategy, which has been updated
    to support more flexible behavior. Note that this strategy
    results in a 1 ms pause for every chunk transferred, limiting
    upload speed using the default 8 KiB chunk size to at most
    8 MiB/second.
    
    The original discussion can be found on the mailing list:
    https://www.mail-archive.com/httpclient-users@hc.apache.org/msg09911.html
    
    This closes #206
---
 ...gResponseOutOfOrderStrategyIntegrationTest.java | 215 +++++++++++++++++++++
 .../http/impl/io/DefaultBHttpClientConnection.java | 105 +++++++++-
 .../io/DefaultBHttpClientConnectionFactory.java    |  32 ++-
 .../io/MonitoringResponseOutOfOrderStrategy.java   | 111 +++++++++++
 .../http/impl/io/NoResponseOutOfOrderStrategy.java |  61 ++++++
 .../http/impl/io/ResponseOutOfOrderException.java  |  41 ++++
 .../core5/http/io/ResponseOutOfOrderStrategy.java  |  66 +++++++
 .../java/org/apache/hc/core5/util/Timeout.java     |   5 +
 .../TestMonitoringResponseOutOfOrderStrategy.java  | 133 +++++++++++++
 9 files changed, 764 insertions(+), 5 deletions(-)

diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/classic/MonitoringResponseOutOfOrderStrategyIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/classic/MonitoringResponseOutOfOrderStrategyIntegrationTest.java
new file mode 100644
index 0000000..fad4491
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/classic/MonitoringResponseOutOfOrderStrategyIntegrationTest.java
@@ -0,0 +1,215 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.classic;
+
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.ClassicHttpResponse;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.bootstrap.HttpRequester;
+import org.apache.hc.core5.http.impl.bootstrap.RequesterBootstrap;
+import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory;
+import org.apache.hc.core5.http.impl.io.MonitoringResponseOutOfOrderStrategy;
+import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.SocketConfig;
+import org.apache.hc.core5.http.io.entity.AbstractHttpEntity;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.testing.SSLTestContexts;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class MonitoringResponseOutOfOrderStrategyIntegrationTest {
+
+    // Use a 16k buffer for consistent results across systems
+    private static final int BUFFER_SIZE = 16 * 1024;
+    private static final Timeout TIMEOUT = Timeout.ofSeconds(3);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> protocols() {
+        return Arrays.asList(new Object[][]{
+                { URIScheme.HTTP },
+                { URIScheme.HTTPS }
+        });
+    }
+
+    private final URIScheme scheme;
+    private ClassicTestServer server;
+    private HttpRequester requester;
+
+    public MonitoringResponseOutOfOrderStrategyIntegrationTest(final URIScheme scheme) {
+        this.scheme = scheme;
+    }
+
+    @Rule
+    public ExternalResource serverResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            server = new ClassicTestServer(
+                    scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null,
+                    SocketConfig.custom()
+                            .setSoTimeout(TIMEOUT)
+                            .setSndBufSize(BUFFER_SIZE)
+                            .setRcvBufSize(BUFFER_SIZE)
+                            .setSoKeepAlive(false)
+                            .build());
+        }
+
+        @Override
+        protected void after() {
+            if (server != null) {
+                try {
+                    server.shutdown(CloseMode.IMMEDIATE);
+                    server = null;
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    @Rule
+    public ExternalResource requesterResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            requester = RequesterBootstrap.bootstrap()
+                    .setSslContext(scheme == URIScheme.HTTPS  ? SSLTestContexts.createClientSSLContext() : null)
+                    .setSocketConfig(SocketConfig.custom()
+                            .setSoTimeout(TIMEOUT)
+                            .setRcvBufSize(BUFFER_SIZE)
+                            .setSndBufSize(BUFFER_SIZE)
+                            .setSoKeepAlive(false)
+                            .build())
+                    .setStreamListener(LoggingHttp1StreamListener.INSTANCE)
+                    .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
+                    .setConnectionFactory(DefaultBHttpClientConnectionFactory.builder()
+                            .responseOutOfOrderStrategy(MonitoringResponseOutOfOrderStrategy.INSTANCE)
+                            .build())
+                    .create();
+        }
+
+        @Override
+        protected void after() {
+            if (requester != null) {
+                try {
+                    requester.close(CloseMode.IMMEDIATE);
+                    requester = null;
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    @Test(timeout = 5000) // Failures may hang
+    public void testResponseOutOfOrderWithDefaultStrategy() throws Exception {
+        this.server.registerHandler("*", new HttpRequestHandler() {
+
+            @Override
+            public void handle(
+                    final ClassicHttpRequest request,
+                    final ClassicHttpResponse response,
+                    final HttpContext context) throws IOException {
+                response.setCode(400);
+                response.setEntity(new AllOnesHttpEntity(200000));
+            }
+
+        });
+
+        this.server.start(null, null, null);
+
+        final HttpCoreContext context = HttpCoreContext.create();
+        final HttpHost host = new HttpHost(scheme.id, "localhost", this.server.getPort());
+
+        final ClassicHttpRequest post = new BasicClassicHttpRequest(Method.POST, "/");
+        post.setEntity(new AllOnesHttpEntity(200000));
+
+        try (final ClassicHttpResponse response = requester.execute(host, post, TIMEOUT, context)) {
+            Assert.assertEquals(400, response.getCode());
+            EntityUtils.consumeQuietly(response.getEntity());
+        }
+    }
+
+    private static final class AllOnesHttpEntity extends AbstractHttpEntity {
+        private long remaining;
+
+        protected AllOnesHttpEntity(final long length) {
+            super(ContentType.APPLICATION_OCTET_STREAM, null, true);
+            this.remaining = length;
+        }
+
+        @Override
+        public InputStream getContent() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void writeTo(final OutputStream outStream) throws IOException {
+            final byte[] buf = new byte[1024];
+            while (remaining > 0) {
+                final int writeLength = (int) Math.min(remaining, buf.length);
+                outStream.write(buf, 0, writeLength);
+                outStream.flush();
+                remaining -= writeLength;
+            }
+        }
+
+        @Override
+        public boolean isStreaming() {
+            return true;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public long getContentLength() {
+            return -1L;
+        }
+    }
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnection.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnection.java
index dd5c1e3..d905bab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnection.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnection.java
@@ -28,6 +28,7 @@
 package org.apache.hc.core5.http.impl.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.charset.CharsetDecoder;
@@ -51,6 +52,7 @@ import org.apache.hc.core5.http.io.HttpMessageParser;
 import org.apache.hc.core5.http.io.HttpMessageParserFactory;
 import org.apache.hc.core5.http.io.HttpMessageWriter;
 import org.apache.hc.core5.http.io.HttpMessageWriterFactory;
+import org.apache.hc.core5.http.io.ResponseOutOfOrderStrategy;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -65,6 +67,7 @@ public class DefaultBHttpClientConnection extends BHttpConnectionBase
     private final HttpMessageWriter<ClassicHttpRequest> requestWriter;
     private final ContentLengthStrategy incomingContentStrategy;
     private final ContentLengthStrategy outgoingContentStrategy;
+    private final ResponseOutOfOrderStrategy responseOutOfOrderStrategy;
     private volatile boolean consistent;
 
     /**
@@ -80,6 +83,8 @@ public class DefaultBHttpClientConnection extends BHttpConnectionBase
      *   {@link DefaultContentLengthStrategy#INSTANCE} will be used.
      * @param outgoingContentStrategy outgoing content length strategy. If {@code null}
      *   {@link DefaultContentLengthStrategy#INSTANCE} will be used.
+     * @param responseOutOfOrderStrategy response out of order strategy. If {@code null}
+     *   {@link NoResponseOutOfOrderStrategy#INSTANCE} will be used.
      * @param requestWriterFactory request writer factory. If {@code null}
      *   {@link DefaultHttpRequestWriterFactory#INSTANCE} will be used.
      * @param responseParserFactory response parser factory. If {@code null}
@@ -91,6 +96,7 @@ public class DefaultBHttpClientConnection extends BHttpConnectionBase
             final CharsetEncoder charEncoder,
             final ContentLengthStrategy incomingContentStrategy,
             final ContentLengthStrategy outgoingContentStrategy,
+            final ResponseOutOfOrderStrategy responseOutOfOrderStrategy,
             final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory,
             final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory) {
         super(http1Config, charDecoder, charEncoder);
@@ -99,12 +105,51 @@ public class DefaultBHttpClientConnection extends BHttpConnectionBase
         this.responseParser = (responseParserFactory != null ? responseParserFactory :
             DefaultHttpResponseParserFactory.INSTANCE).create(http1Config);
         this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
-                DefaultContentLengthStrategy.INSTANCE;
+            DefaultContentLengthStrategy.INSTANCE;
         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
-                DefaultContentLengthStrategy.INSTANCE;
+            DefaultContentLengthStrategy.INSTANCE;
+        this.responseOutOfOrderStrategy = responseOutOfOrderStrategy != null ? responseOutOfOrderStrategy :
+            NoResponseOutOfOrderStrategy.INSTANCE;
         this.consistent = true;
     }
 
+    /**
+     * Creates new instance of DefaultBHttpClientConnection.
+     *
+     * @param http1Config Message http1Config. If {@code null}
+     *   {@link Http1Config#DEFAULT} will be used.
+     * @param charDecoder decoder to be used for decoding HTTP protocol elements.
+     *   If {@code null} simple type cast will be used for byte to char conversion.
+     * @param charEncoder encoder to be used for encoding HTTP protocol elements.
+     *   If {@code null} simple type cast will be used for char to byte conversion.
+     * @param incomingContentStrategy incoming content length strategy. If {@code null}
+     *   {@link DefaultContentLengthStrategy#INSTANCE} will be used.
+     * @param outgoingContentStrategy outgoing content length strategy. If {@code null}
+     *   {@link DefaultContentLengthStrategy#INSTANCE} will be used.
+     * @param requestWriterFactory request writer factory. If {@code null}
+     *   {@link DefaultHttpRequestWriterFactory#INSTANCE} will be used.
+     * @param responseParserFactory response parser factory. If {@code null}
+     *   {@link DefaultHttpResponseParserFactory#INSTANCE} will be used.
+     */
+    public DefaultBHttpClientConnection(
+            final Http1Config http1Config,
+            final CharsetDecoder charDecoder,
+            final CharsetEncoder charEncoder,
+            final ContentLengthStrategy incomingContentStrategy,
+            final ContentLengthStrategy outgoingContentStrategy,
+            final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory,
+            final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory) {
+        this(
+                http1Config,
+                charDecoder,
+                charEncoder,
+                incomingContentStrategy,
+                outgoingContentStrategy,
+                null,
+                requestWriterFactory,
+                responseParserFactory);
+    }
+
     public DefaultBHttpClientConnection(
             final Http1Config http1Config,
             final CharsetDecoder charDecoder,
@@ -149,8 +194,62 @@ public class DefaultBHttpClientConnection extends BHttpConnectionBase
         if (len == ContentLengthStrategy.UNDEFINED) {
             throw new LengthRequiredException();
         }
-        try (final OutputStream outStream = createContentOutputStream(len, this.outbuffer, socketHolder.getOutputStream(), entity.getTrailers())) {
+        try (final OutputStream outStream = createContentOutputStream(
+                len, this.outbuffer, new OutputStream() {
+
+                    final OutputStream socketOutputStream = socketHolder.getOutputStream();
+                    final InputStream socketInputStream = socketHolder.getInputStream();
+
+                    long totalBytes = 0;
+
+                    void checkForEarlyResponse(final long totalBytesSent, final int nextWriteSize) throws IOException {
+                        if (responseOutOfOrderStrategy.isEarlyResponseDetected(
+                                request,
+                                DefaultBHttpClientConnection.this,
+                                socketInputStream,
+                                totalBytesSent,
+                                nextWriteSize)) {
+                            throw new ResponseOutOfOrderException();
+                        }
+                    }
+
+                    @Override
+                    public void write(final byte[] b) throws IOException {
+                        checkForEarlyResponse(totalBytes, b.length);
+                        totalBytes += b.length;
+                        socketOutputStream.write(b);
+                    }
+
+                    @Override
+                    public void write(final byte[] b, final int off, final int len) throws IOException {
+                        checkForEarlyResponse(totalBytes, len);
+                        totalBytes += len;
+                        socketOutputStream.write(b, off, len);
+                    }
+
+                    @Override
+                    public void write(final int b) throws IOException {
+                        checkForEarlyResponse(totalBytes, 1);
+                        totalBytes++;
+                        socketOutputStream.write(b);
+                    }
+
+                    @Override
+                    public void flush() throws IOException {
+                        socketOutputStream.flush();
+                    }
+
+                    @Override
+                    public void close() throws IOException {
+                        socketOutputStream.close();
+                    }
+
+                }, entity.getTrailers())) {
             entity.writeTo(outStream);
+        } catch (final ResponseOutOfOrderException ex) {
+            if (len > 0) {
+                this.consistent = false;
+            }
         }
     }
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnectionFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnectionFactory.java
index 37c2e48..c2becd2 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnectionFactory.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/DefaultBHttpClientConnectionFactory.java
@@ -41,6 +41,7 @@ import org.apache.hc.core5.http.impl.CharCodingSupport;
 import org.apache.hc.core5.http.io.HttpConnectionFactory;
 import org.apache.hc.core5.http.io.HttpMessageParserFactory;
 import org.apache.hc.core5.http.io.HttpMessageWriterFactory;
+import org.apache.hc.core5.http.io.ResponseOutOfOrderStrategy;
 
 /**
  * Default factory for {@link org.apache.hc.core5.http.io.HttpClientConnection}s.
@@ -55,21 +56,23 @@ public class DefaultBHttpClientConnectionFactory
     private final CharCodingConfig charCodingConfig;
     private final ContentLengthStrategy incomingContentStrategy;
     private final ContentLengthStrategy outgoingContentStrategy;
+    private final ResponseOutOfOrderStrategy responseOutOfOrderStrategy;
     private final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory;
     private final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory;
 
-    public DefaultBHttpClientConnectionFactory(
+    private DefaultBHttpClientConnectionFactory(
             final Http1Config http1Config,
             final CharCodingConfig charCodingConfig,
             final ContentLengthStrategy incomingContentStrategy,
             final ContentLengthStrategy outgoingContentStrategy,
+            final ResponseOutOfOrderStrategy responseOutOfOrderStrategy,
             final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory,
             final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory) {
-        super();
         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
         this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT;
         this.incomingContentStrategy = incomingContentStrategy;
         this.outgoingContentStrategy = outgoingContentStrategy;
+        this.responseOutOfOrderStrategy = responseOutOfOrderStrategy;
         this.requestWriterFactory = requestWriterFactory;
         this.responseParserFactory = responseParserFactory;
     }
@@ -77,6 +80,23 @@ public class DefaultBHttpClientConnectionFactory
     public DefaultBHttpClientConnectionFactory(
             final Http1Config http1Config,
             final CharCodingConfig charCodingConfig,
+            final ContentLengthStrategy incomingContentStrategy,
+            final ContentLengthStrategy outgoingContentStrategy,
+            final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory,
+            final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory) {
+        this(
+                http1Config,
+                charCodingConfig,
+                incomingContentStrategy,
+                outgoingContentStrategy,
+                null,
+                requestWriterFactory,
+                responseParserFactory);
+    }
+
+    public DefaultBHttpClientConnectionFactory(
+            final Http1Config http1Config,
+            final CharCodingConfig charCodingConfig,
             final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory,
             final HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory) {
         this(http1Config, charCodingConfig, null, null, requestWriterFactory, responseParserFactory);
@@ -100,6 +120,7 @@ public class DefaultBHttpClientConnectionFactory
                 CharCodingSupport.createEncoder(this.charCodingConfig),
                 this.incomingContentStrategy,
                 this.outgoingContentStrategy,
+                this.responseOutOfOrderStrategy,
                 this.requestWriterFactory,
                 this.responseParserFactory);
         conn.bind(socket);
@@ -125,6 +146,7 @@ public class DefaultBHttpClientConnectionFactory
         private CharCodingConfig charCodingConfig;
         private ContentLengthStrategy incomingContentLengthStrategy;
         private ContentLengthStrategy outgoingContentLengthStrategy;
+        private ResponseOutOfOrderStrategy responseOutOfOrderStrategy;
         private HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory;
         private HttpMessageParserFactory<ClassicHttpResponse> responseParserFactory;
 
@@ -150,6 +172,11 @@ public class DefaultBHttpClientConnectionFactory
             return this;
         }
 
+        public Builder responseOutOfOrderStrategy(final ResponseOutOfOrderStrategy responseOutOfOrderStrategy) {
+            this.responseOutOfOrderStrategy = responseOutOfOrderStrategy;
+            return this;
+        }
+
         public Builder requestWriterFactory(
                 final HttpMessageWriterFactory<ClassicHttpRequest> requestWriterFactory) {
             this.requestWriterFactory = requestWriterFactory;
@@ -168,6 +195,7 @@ public class DefaultBHttpClientConnectionFactory
                     charCodingConfig,
                     incomingContentLengthStrategy,
                     outgoingContentLengthStrategy,
+                    responseOutOfOrderStrategy,
                     requestWriterFactory,
                     responseParserFactory);
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/MonitoringResponseOutOfOrderStrategy.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/MonitoringResponseOutOfOrderStrategy.java
new file mode 100644
index 0000000..303e61b
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/MonitoringResponseOutOfOrderStrategy.java
@@ -0,0 +1,111 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.io;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.io.HttpClientConnection;
+import org.apache.hc.core5.http.io.ResponseOutOfOrderStrategy;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Timeout;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link ResponseOutOfOrderStrategy} implementation which checks for premature responses every {@link #chunkSize}
+ * bytes. An 8 KiB chunk size is used by default based on testing using values between 4 KiB and 128 KiB. This is
+ * optimized for correctness and results in a maximum upload speed of 8 MiB/s until {@link #maxChunksToCheck} is
+ * reached.
+ *
+ * @since 5.1
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public final class MonitoringResponseOutOfOrderStrategy implements ResponseOutOfOrderStrategy {
+
+    private static final int DEFAULT_CHUNK_SIZE = 8 * 1024;
+
+    public static final MonitoringResponseOutOfOrderStrategy INSTANCE = new MonitoringResponseOutOfOrderStrategy();
+
+    private final long chunkSize;
+    private final long maxChunksToCheck;
+
+    /**
+     * Instantiates a default {@link MonitoringResponseOutOfOrderStrategy}. {@link #INSTANCE} may be used instead.
+     */
+    public MonitoringResponseOutOfOrderStrategy() {
+        this(DEFAULT_CHUNK_SIZE);
+    }
+
+    /**
+     * Instantiates a {@link MonitoringResponseOutOfOrderStrategy} with unlimited {@link #maxChunksToCheck}.
+     *
+     * @param chunkSize The chunk size after which a response check is executed.
+     */
+    public MonitoringResponseOutOfOrderStrategy(final long chunkSize) {
+        this(chunkSize, Long.MAX_VALUE);
+    }
+
+    /**
+     * Instantiates a {@link MonitoringResponseOutOfOrderStrategy}.
+     *
+     * @param chunkSize The chunk size after which a response check is executed.
+     * @param maxChunksToCheck The maximum number of chunks to check, allowing expensive checks to be avoided
+     *                         after a sufficient portion of the request entity has been transferred.
+     */
+    public MonitoringResponseOutOfOrderStrategy(final long chunkSize, final long maxChunksToCheck) {
+        this.chunkSize = Args.positive(chunkSize, "chunkSize");
+        this.maxChunksToCheck = Args.positive(maxChunksToCheck, "maxChunksToCheck");
+    }
+
+    @Override
+    public boolean isEarlyResponseDetected(
+            final ClassicHttpRequest request,
+            final HttpClientConnection connection,
+            final InputStream inputStream,
+            final long totalBytesSent,
+            final long nextWriteSize) throws IOException {
+        if (nextWriteStartsNewChunk(totalBytesSent, nextWriteSize)) {
+            final boolean ssl = connection.getSSLSession() != null;
+            return ssl ? connection.isDataAvailable(Timeout.ONE_MILLISECOND) : (inputStream.available() > 0);
+        }
+        return false;
+    }
+
+    private boolean nextWriteStartsNewChunk(final long totalBytesSent, final long nextWriteSize) {
+        final long currentChunkIndex = Math.min(totalBytesSent / chunkSize, maxChunksToCheck);
+        final long newChunkIndex = Math.min((totalBytesSent + nextWriteSize) / chunkSize, maxChunksToCheck);
+        return currentChunkIndex < newChunkIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "DefaultResponseOutOfOrderStrategy{chunkSize=" + chunkSize + ", maxChunksToCheck=" + maxChunksToCheck + '}';
+    }
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/NoResponseOutOfOrderStrategy.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/NoResponseOutOfOrderStrategy.java
new file mode 100644
index 0000000..94606e2
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/NoResponseOutOfOrderStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.io;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.io.HttpClientConnection;
+import org.apache.hc.core5.http.io.ResponseOutOfOrderStrategy;
+
+import java.io.InputStream;
+
+/**
+ * An implementation of {@link ResponseOutOfOrderStrategy} which does not check for early responses.
+ *
+ * Early response detection requires 1ms blocking reads and incurs a hefty performance cost for
+ * large uploads.
+ *
+ * @see MonitoringResponseOutOfOrderStrategy
+ * @since 5.1
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public final class NoResponseOutOfOrderStrategy implements ResponseOutOfOrderStrategy {
+
+    public static final NoResponseOutOfOrderStrategy INSTANCE = new NoResponseOutOfOrderStrategy();
+
+    @Override
+    public boolean isEarlyResponseDetected(
+            final ClassicHttpRequest request,
+            final HttpClientConnection connection,
+            final InputStream inputStream,
+            final long totalBytesSent,
+            final long nextWriteSize) {
+        return false;
+    }
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ResponseOutOfOrderException.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ResponseOutOfOrderException.java
new file mode 100644
index 0000000..5744342
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/ResponseOutOfOrderException.java
@@ -0,0 +1,41 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.io;
+
+import java.io.IOException;
+
+/**
+ * Signals an early (out of order) response.
+ */
+class ResponseOutOfOrderException extends IOException {
+
+    public ResponseOutOfOrderException() {
+        super();
+    }
+
+}
\ No newline at end of file
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/ResponseOutOfOrderStrategy.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/ResponseOutOfOrderStrategy.java
new file mode 100644
index 0000000..ff03f68
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/ResponseOutOfOrderStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.io;
+
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Represents a strategy to determine how frequently the client should check for an out of order response.
+ * An out of order response is sent before the server has read the full request. If the client fails to
+ * check for an early response then a {@link java.net.SocketException} or {@link java.net.SocketTimeoutException}
+ * may be thrown while writing the request entity after a timeout is reached on either the client or server.
+ *
+ * @since 5.1
+ */
+@Internal
+public interface ResponseOutOfOrderStrategy {
+
+    /**
+     * Called before each write to the to a socket {@link java.io.OutputStream} with the number of
+     * bytes that have already been sent, and the size of the write that will occur if this check
+     * does not encounter an out of order response.
+     *
+     * @param request The current request.
+     * @param connection The connection used to send the current request.
+     * @param inputStream The response stream, this may be used to check for an early response.
+     * @param totalBytesSent Number of bytes that have already been sent.
+     * @param nextWriteSize The size of a socket write operation that will follow this check.
+     * @return True if an early response was detected, otherwise false.
+     * @throws IOException in case of a network failure while checking for an early response.
+     */
+    boolean isEarlyResponseDetected(
+            ClassicHttpRequest request,
+            HttpClientConnection connection,
+            InputStream inputStream,
+            long totalBytesSent,
+            long nextWriteSize) throws IOException;
+}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/util/Timeout.java b/httpcore5/src/main/java/org/apache/hc/core5/util/Timeout.java
index f6a835a..77619f6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/util/Timeout.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/util/Timeout.java
@@ -47,6 +47,11 @@ public class Timeout extends TimeValue {
     public static final Timeout ZERO_MILLISECONDS = Timeout.of(0, TimeUnit.MILLISECONDS);
 
     /**
+     * A one milliseconds {@link Timeout}.
+     */
+    public static final Timeout ONE_MILLISECOND = Timeout.of(1, TimeUnit.MILLISECONDS);
+
+    /**
      * A disabled timeout represented as 0 {@code MILLISECONDS}.
      */
     public static final Timeout DISABLED = ZERO_MILLISECONDS;
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestMonitoringResponseOutOfOrderStrategy.java b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestMonitoringResponseOutOfOrderStrategy.java
new file mode 100644
index 0000000..4c8966e
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/impl/io/TestMonitoringResponseOutOfOrderStrategy.java
@@ -0,0 +1,133 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.net.ssl.SSLSession;
+
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.io.HttpClientConnection;
+import org.apache.hc.core5.http.io.ResponseOutOfOrderStrategy;
+import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class TestMonitoringResponseOutOfOrderStrategy {
+
+    private static final ClassicHttpRequest REQUEST = new BasicClassicHttpRequest("POST", "/path");
+
+    @Test
+    public void testFirstByteIsNotCheckedSsl() throws IOException {
+        final boolean earlyResponse = MonitoringResponseOutOfOrderStrategy.INSTANCE.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                // SSLSocket streams report zero bytes available
+                socketInputStream(0),
+                0,
+                1);
+        Assert.assertFalse(earlyResponse);
+    }
+
+    @Test
+    public void testFirstByteIsNotCheckedPlain() throws IOException {
+        final boolean earlyResponse = MonitoringResponseOutOfOrderStrategy.INSTANCE.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, false),
+                socketInputStream(1),
+                0,
+                1);
+        Assert.assertFalse(earlyResponse);
+    }
+
+    @Test
+    public void testWritesWithinChunkAreNotChecked() throws IOException {
+        final boolean earlyResponse = MonitoringResponseOutOfOrderStrategy.INSTANCE.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                socketInputStream(0),
+                1,
+                8190);
+        Assert.assertFalse(
+                "There is data available, but checks shouldn't occur until just prior to the 8192nd byte",
+                earlyResponse);
+    }
+
+    @Test
+    public void testWritesAcrossChunksAreChecked() throws IOException {
+        final boolean earlyResponse = MonitoringResponseOutOfOrderStrategy.INSTANCE.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                socketInputStream(0),
+                8191,
+                1);
+        Assert.assertTrue(earlyResponse);
+    }
+
+    @Test
+    public void testMaximumChunks() throws IOException {
+        final ResponseOutOfOrderStrategy strategy = new MonitoringResponseOutOfOrderStrategy(1, 2);
+        Assert.assertTrue(strategy.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                socketInputStream(0),
+                0,
+                1));
+        Assert.assertTrue(strategy.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                socketInputStream(0),
+                1,
+                2));
+        Assert.assertFalse(strategy.isEarlyResponseDetected(
+                REQUEST,
+                connection(true, true),
+                socketInputStream(0),
+                2,
+                3));
+    }
+
+    private static InputStream socketInputStream(final int available) throws IOException {
+        final InputStream stream = Mockito.mock(InputStream.class);
+        Mockito.when(stream.available()).thenReturn(available);
+        return stream;
+    }
+
+    private static HttpClientConnection connection(final boolean dataAvailable, final boolean ssl) throws IOException {
+        final HttpClientConnection connection = Mockito.mock(HttpClientConnection.class);
+        Mockito.when(connection.isDataAvailable(ArgumentMatchers.any(Timeout.class))).thenReturn(dataAvailable);
+        if (ssl) {
+            Mockito.when(connection.getSSLSession()).thenReturn(Mockito.mock(SSLSession.class));
+        }
+        return connection;
+    }
+}