You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/26 20:38:06 UTC

[httpcomponents-core] 03/03: Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit dac54708766ece837f2755b7928ef83e12f9e384
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Tue Sep 24 19:56:46 2019 +0200

    Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599
---
 .../http/impl/nio/AbstractHttp1StreamDuplexer.java | 100 ++++++++++-----------
 .../org/apache/hc/core5/reactor/IOSessionImpl.java |   3 -
 2 files changed, 48 insertions(+), 55 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 6860bd3..4fe2a88 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -238,61 +238,51 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
         if (src != null) {
             inbuf.put(src);
         }
-        while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
-            int totalBytesRead = 0;
-            int messagesReceived = 0;
+
+        if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
+            ioSession.clearEvent(SelectionKey.OP_READ);
+            return;
+        }
+
+        boolean endOfStream = false;
+        if (incomingMessage == null) {
+            final int bytesRead = inbuf.fill(ioSession);
+            if (bytesRead > 0) {
+                inTransportMetrics.incrementBytesTransferred(bytesRead);
+            }
+            endOfStream = bytesRead == -1;
+        }
+
+        do {
             if (incomingMessage == null) {
 
-                if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
-                    ioSession.clearEvent(SelectionKey.OP_READ);
-                    return;
-                }
+                final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
+                if (messageHead != null) {
+                    incomingMessageParser.reset();
+
+                    this.version = messageHead.getVersion();
 
-                int bytesRead;
-                do {
-                    bytesRead = inbuf.fill(ioSession);
-                    if (bytesRead > 0) {
-                        totalBytesRead += bytesRead;
-                        inTransportMetrics.incrementBytesTransferred(bytesRead);
+                    updateInputMetrics(messageHead, connMetrics);
+                    final ContentDecoder contentDecoder;
+                    if (handleIncomingMessage(messageHead)) {
+                        final long len = incomingContentStrategy.determineLength(messageHead);
+                        contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
+                        consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
+                    } else {
+                        consumeHeader(messageHead, null);
+                        contentDecoder = null;
                     }
-                    final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
-                    if (messageHead != null) {
-                        messagesReceived++;
-                        incomingMessageParser.reset();
-
-                        this.version = messageHead.getVersion();
-
-                        updateInputMetrics(messageHead, connMetrics);
-                        final ContentDecoder contentDecoder;
-                        if (handleIncomingMessage(messageHead)) {
-                            final long len = incomingContentStrategy.determineLength(messageHead);
-                            contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
-                            consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
-                        } else {
-                            consumeHeader(messageHead, null);
-                            contentDecoder = null;
-                        }
-                        capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
-                        if (contentDecoder != null) {
-                            incomingMessage = new Message<>(messageHead, contentDecoder);
-                            break;
-                        }
+                    capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
+                    if (contentDecoder != null) {
+                        incomingMessage = new Message<>(messageHead, contentDecoder);
+                    } else {
                         inputEnd();
                         if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
                             ioSession.setEvent(SelectionKey.OP_READ);
-                        } else {
-                            break;
                         }
                     }
-                } while (bytesRead > 0);
-
-                if (bytesRead == -1 && !inbuf.hasData()) {
-                    if (outputIdle() && inputIdle()) {
-                        requestShutdown(CloseMode.GRACEFUL);
-                    } else {
-                        shutdownSession(new ConnectionClosedException("Connection closed by peer"));
-                    }
-                    return;
+                } else {
+                    break;
                 }
             }
 
@@ -303,9 +293,8 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                 // over its declared capacity in order to avoid having
                 // unprocessed message body content stuck in the session
                 // input buffer
-                int bytesRead;
-                while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
-                    totalBytesRead += bytesRead;
+                final int bytesRead = contentDecoder.read(contentBuffer);
+                if (bytesRead > 0) {
                     contentBuffer.flip();
                     consumeData(contentBuffer);
                     contentBuffer.clear();
@@ -314,7 +303,6 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                         if (!contentDecoder.isCompleted()) {
                             updateCapacity(capacityWindow);
                         }
-                        break;
                     }
                 }
                 if (contentDecoder.isCompleted()) {
@@ -324,9 +312,17 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
                     ioSession.setEvent(SelectionKey.OP_READ);
                     inputEnd();
                 }
+                if (bytesRead == 0) {
+                    break;
+                }
             }
-            if (totalBytesRead == 0 && messagesReceived == 0) {
-                break;
+        } while (inbuf.hasData());
+
+        if (endOfStream && !inbuf.hasData()) {
+            if (outputIdle() && inputIdle()) {
+                requestShutdown(CloseMode.GRACEFUL);
+            } else {
+                shutdownSession(new ConnectionClosedException("Connection closed by peer"));
             }
         }
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 930acde..6cd742a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -196,9 +196,6 @@ class IOSessionImpl implements IOSession {
 
     @Override
     public int read(final ByteBuffer dst) throws IOException {
-        if ((this.key.interestOps() & SelectionKey.OP_READ) == 0) {
-            return 0;
-        }
         return this.channel.read(dst);
     }