You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/09/26 20:38:06 UTC
[httpcomponents-core] 03/03: Simplification of HTTP/1.1 read event
handling logic; better fix for HTTPCORE-599
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit dac54708766ece837f2755b7928ef83e12f9e384
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Tue Sep 24 19:56:46 2019 +0200
Simplification of HTTP/1.1 read event handling logic; better fix for HTTPCORE-599
---
.../http/impl/nio/AbstractHttp1StreamDuplexer.java | 100 ++++++++++-----------
.../org/apache/hc/core5/reactor/IOSessionImpl.java | 3 -
2 files changed, 48 insertions(+), 55 deletions(-)
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
index 6860bd3..4fe2a88 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
@@ -238,61 +238,51 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
if (src != null) {
inbuf.put(src);
}
- while (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
- int totalBytesRead = 0;
- int messagesReceived = 0;
+
+ if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
+ ioSession.clearEvent(SelectionKey.OP_READ);
+ return;
+ }
+
+ boolean endOfStream = false;
+ if (incomingMessage == null) {
+ final int bytesRead = inbuf.fill(ioSession);
+ if (bytesRead > 0) {
+ inTransportMetrics.incrementBytesTransferred(bytesRead);
+ }
+ endOfStream = bytesRead == -1;
+ }
+
+ do {
if (incomingMessage == null) {
- if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
- ioSession.clearEvent(SelectionKey.OP_READ);
- return;
- }
+ final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
+ if (messageHead != null) {
+ incomingMessageParser.reset();
+
+ this.version = messageHead.getVersion();
- int bytesRead;
- do {
- bytesRead = inbuf.fill(ioSession);
- if (bytesRead > 0) {
- totalBytesRead += bytesRead;
- inTransportMetrics.incrementBytesTransferred(bytesRead);
+ updateInputMetrics(messageHead, connMetrics);
+ final ContentDecoder contentDecoder;
+ if (handleIncomingMessage(messageHead)) {
+ final long len = incomingContentStrategy.determineLength(messageHead);
+ contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
+ consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
+ } else {
+ consumeHeader(messageHead, null);
+ contentDecoder = null;
}
- final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
- if (messageHead != null) {
- messagesReceived++;
- incomingMessageParser.reset();
-
- this.version = messageHead.getVersion();
-
- updateInputMetrics(messageHead, connMetrics);
- final ContentDecoder contentDecoder;
- if (handleIncomingMessage(messageHead)) {
- final long len = incomingContentStrategy.determineLength(messageHead);
- contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
- consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
- } else {
- consumeHeader(messageHead, null);
- contentDecoder = null;
- }
- capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
- if (contentDecoder != null) {
- incomingMessage = new Message<>(messageHead, contentDecoder);
- break;
- }
+ capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
+ if (contentDecoder != null) {
+ incomingMessage = new Message<>(messageHead, contentDecoder);
+ } else {
inputEnd();
if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
ioSession.setEvent(SelectionKey.OP_READ);
- } else {
- break;
}
}
- } while (bytesRead > 0);
-
- if (bytesRead == -1 && !inbuf.hasData()) {
- if (outputIdle() && inputIdle()) {
- requestShutdown(CloseMode.GRACEFUL);
- } else {
- shutdownSession(new ConnectionClosedException("Connection closed by peer"));
- }
- return;
+ } else {
+ break;
}
}
@@ -303,9 +293,8 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
// over its declared capacity in order to avoid having
// unprocessed message body content stuck in the session
// input buffer
- int bytesRead;
- while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
- totalBytesRead += bytesRead;
+ final int bytesRead = contentDecoder.read(contentBuffer);
+ if (bytesRead > 0) {
contentBuffer.flip();
consumeData(contentBuffer);
contentBuffer.clear();
@@ -314,7 +303,6 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
if (!contentDecoder.isCompleted()) {
updateCapacity(capacityWindow);
}
- break;
}
}
if (contentDecoder.isCompleted()) {
@@ -324,9 +312,17 @@ abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage,
ioSession.setEvent(SelectionKey.OP_READ);
inputEnd();
}
+ if (bytesRead == 0) {
+ break;
+ }
}
- if (totalBytesRead == 0 && messagesReceived == 0) {
- break;
+ } while (inbuf.hasData());
+
+ if (endOfStream && !inbuf.hasData()) {
+ if (outputIdle() && inputIdle()) {
+ requestShutdown(CloseMode.GRACEFUL);
+ } else {
+ shutdownSession(new ConnectionClosedException("Connection closed by peer"));
}
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 930acde..6cd742a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -196,9 +196,6 @@ class IOSessionImpl implements IOSession {
@Override
public int read(final ByteBuffer dst) throws IOException {
- if ((this.key.interestOps() & SelectionKey.OP_READ) == 0) {
- return 0;
- }
return this.channel.read(dst);
}