You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/04/14 11:13:28 UTC
svn commit: r1791350 [1/2] - in /httpcomponents/httpcore/trunk:
httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/
httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/
httpcore5/src/examples/org/apache/hc/core5/http/examples/...
Author: olegk
Date: Fri Apr 14 11:13:28 2017
New Revision: 1791350
URL: http://svn.apache.org/viewvc?rev=1791350&view=rev
Log:
Rewrite of non-blocking HTTP/1.1 connection persistence and re-use code; improved HttpAsyncRequester; added integration tests for HttpAsyncRequester and HttpAsyncServer
Added:
httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java (with props)
Removed:
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/PoolEntryHolder.java
Modified:
httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java
httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java
Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java Fri Apr 14 11:13:28 2017
@@ -112,30 +112,33 @@ public class Http2RequestExecutionExampl
for (final String requestUri: requestUris) {
final Future<AsyncClientEndpoint> future = requester.connect(target, TimeValue.ofSeconds(5));
final AsyncClientEndpoint clientEndpoint = future.get();
- clientEndpoint.executeAndRelease(
+ clientEndpoint.execute(
new BasicRequestProducer("GET", target, requestUri),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
new FutureCallback<Message<HttpResponse, String>>() {
@Override
public void completed(final Message<HttpResponse, String> message) {
- latch.countDown();
+ clientEndpoint.releaseAndReuse();
HttpResponse response = message.getHead();
String body = message.getBody();
System.out.println(requestUri + "->" + response.getCode());
System.out.println(body);
+ latch.countDown();
}
@Override
public void failed(final Exception ex) {
- latch.countDown();
+ clientEndpoint.releaseAndDiscard();
System.out.println(requestUri + "->" + ex);
+ latch.countDown();
}
@Override
public void cancelled() {
- latch.countDown();
+ clientEndpoint.releaseAndDiscard();
System.out.println(requestUri + " cancelled");
+ latch.countDown();
}
});
Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java?rev=1791350&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java Fri Apr 14 11:13:28 2017
@@ -0,0 +1,357 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.AsyncPushProducer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+public class Http1ServerAndRequesterTest {
+
+ private static final TimeValue TIMEOUT = TimeValue.ofSeconds(30);
+
+ private HttpAsyncServer server;
+
+ @Rule
+ public ExternalResource serverResource = new ExternalResource() {
+
+ @Override
+ protected void before() throws Throwable {
+ server = AsyncServerBootstrap.bootstrap()
+ .setIOReactorConfig(
+ IOReactorConfig.custom()
+ .setConnectTimeout(TIMEOUT)
+ .setSoTimeout(TIMEOUT)
+ .build())
+ .register("/no-keep-alive*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(2048) {
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel) throws HttpException, IOException {
+ super.handleRequest(request, entityDetails, new ResponseChannel() {
+
+ @Override
+ public void sendInformation(final HttpResponse response) throws HttpException, IOException {
+ responseChannel.sendInformation(response);
+ }
+
+ @Override
+ public void sendResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
+ responseChannel.sendResponse(response, entityDetails);
+ }
+
+ @Override
+ public void pushPromise(
+ final HttpRequest promise,
+ final AsyncPushProducer pushProducer) throws HttpException, IOException {
+ responseChannel.pushPromise(promise, pushProducer);
+ }
+
+ });
+ }
+ };
+ }
+
+ })
+ .register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(2048);
+ }
+
+ })
+ .create();
+ }
+
+ @Override
+ protected void after() {
+ if (server != null) {
+ try {
+ server.shutdown(ShutdownType.IMMEDIATE);
+ server = null;
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+
+ };
+
+ private HttpAsyncRequester requester;
+
+ @Rule
+ public ExternalResource clientResource = new ExternalResource() {
+
+ @Override
+ protected void before() throws Throwable {
+ requester = AsyncRequesterBootstrap.bootstrap()
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setConnectTimeout(TIMEOUT)
+ .setSoTimeout(TIMEOUT)
+ .build())
+ .create();
+ }
+
+ @Override
+ protected void after() {
+ if (requester != null) {
+ try {
+ requester.shutdown(ShutdownType.IMMEDIATE);
+ requester = null;
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+
+ };
+
+ @Test
+ public void testSequentialRequests() throws Exception {
+ server.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+ listener.waitFor();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final HttpHost target = new HttpHost("localhost", address.getPort());
+ final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
+ new BasicRequestProducer("POST", target, "/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message1, CoreMatchers.notNullValue());
+ final HttpResponse response1 = message1.getHead();
+ Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body1 = message1.getBody();
+ Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
+ new BasicRequestProducer("POST", target, "/other-stuff",
+ new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message2, CoreMatchers.notNullValue());
+ final HttpResponse response2 = message2.getHead();
+ Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body2 = message2.getBody();
+ Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
+ new BasicRequestProducer("POST", target, "/more-stuff",
+ new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message3, CoreMatchers.notNullValue());
+ final HttpResponse response3 = message3.getHead();
+ Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body3 = message3.getBody();
+ Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+ }
+
+ @Test
+ public void testSequentialRequestsNonPersistentConnection() throws Exception {
+ server.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+ listener.waitFor();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final HttpHost target = new HttpHost("localhost", address.getPort());
+ final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
+ new BasicRequestProducer("POST", target, "/no-keep-alive/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message1, CoreMatchers.notNullValue());
+ final HttpResponse response1 = message1.getHead();
+ Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body1 = message1.getBody();
+ Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
+ new BasicRequestProducer("POST", target, "/no-keep-alive/other-stuff",
+ new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message2, CoreMatchers.notNullValue());
+ final HttpResponse response2 = message2.getHead();
+ Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body2 = message2.getBody();
+ Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
+ new BasicRequestProducer("POST", target, "/no-keep-alive/more-stuff",
+ new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+ final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message3, CoreMatchers.notNullValue());
+ final HttpResponse response3 = message3.getHead();
+ Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body3 = message3.getBody();
+ Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+ }
+
+ @Test
+ public void testSequentialRequestsSameEndpoint() throws Exception {
+ server.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+ listener.waitFor();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final HttpHost target = new HttpHost("localhost", address.getPort());
+ final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, TimeValue.ofSeconds(5));
+ final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ try {
+
+ final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
+ new BasicRequestProducer("POST", target, "/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message1, CoreMatchers.notNullValue());
+ final HttpResponse response1 = message1.getHead();
+ Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body1 = message1.getBody();
+ Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
+ new BasicRequestProducer("POST", target, "/other-stuff",
+ new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message2, CoreMatchers.notNullValue());
+ final HttpResponse response2 = message2.getHead();
+ Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body2 = message2.getBody();
+ Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+ final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
+ new BasicRequestProducer("POST", target, "/more-stuff",
+ new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message3, CoreMatchers.notNullValue());
+ final HttpResponse response3 = message3.getHead();
+ Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body3 = message3.getBody();
+ Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+
+ } finally {
+ endpoint.releaseAndReuse();
+ }
+ }
+
+ @Test
+ public void testPipelinedRequests() throws Exception {
+ server.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+ listener.waitFor();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final HttpHost target = new HttpHost("localhost", address.getPort());
+ final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, TimeValue.ofSeconds(5));
+ final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ try {
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+
+ queue.add(endpoint.execute(
+ new BasicRequestProducer("POST", target, "/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ queue.add(endpoint.execute(
+ new BasicRequestProducer("POST", target, "/other-stuff",
+ new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ queue.add(endpoint.execute(
+ new BasicRequestProducer("POST", target, "/more-stuff",
+ new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
+ final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ Assert.assertThat(message, CoreMatchers.notNullValue());
+ final HttpResponse response = message.getHead();
+ Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+ final String body = message.getBody();
+ Assert.assertThat(body, CoreMatchers.containsString("stuff"));
+ }
+
+ } finally {
+ endpoint.releaseAndReuse();
+ }
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java Fri Apr 14 11:13:28 2017
@@ -27,7 +27,6 @@
package org.apache.hc.core5.http.examples;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.core5.concurrent.FutureCallback;
@@ -41,7 +40,6 @@ import org.apache.hc.core5.http.impl.boo
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.message.StatusLine;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.BasicRequestProducer;
import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
@@ -103,32 +101,31 @@ public class AsyncRequestExecutionExampl
final CountDownLatch latch = new CountDownLatch(requestUris.length);
for (final String requestUri: requestUris) {
- final Future<AsyncClientEndpoint> future = requester.connect(target, TimeValue.ofSeconds(5));
- final AsyncClientEndpoint clientEndpoint = future.get();
- clientEndpoint.executeAndRelease(
+ requester.execute(
new BasicRequestProducer("GET", target, requestUri),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ TimeValue.ofSeconds(5),
new FutureCallback<Message<HttpResponse, String>>() {
@Override
public void completed(final Message<HttpResponse, String> message) {
- latch.countDown();
HttpResponse response = message.getHead();
String body = message.getBody();
System.out.println(requestUri + "->" + response.getCode());
System.out.println(body);
+ latch.countDown();
}
@Override
public void failed(final Exception ex) {
- latch.countDown();
System.out.println(requestUri + "->" + ex);
+ latch.countDown();
}
@Override
public void cancelled() {
- latch.countDown();
System.out.println(requestUri + " cancelled");
+ latch.countDown();
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java Fri Apr 14 11:13:28 2017
@@ -29,20 +29,38 @@ package org.apache.hc.core5.http.impl.bo
import java.io.IOException;
import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.ExceptionListener;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.ExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.pool.ControlledConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -97,13 +115,13 @@ public class HttpAsyncRequester extends
@Override
public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
- final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder = new PoolEntryHolder<>(connPool, poolEntry);
+ final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
final IOSession ioSession = poolEntry.getConnection();
if (ioSession != null && ioSession.isClosed()) {
poolEntry.discardConnection(ShutdownType.IMMEDIATE);
}
if (poolEntry.hasConnection()) {
- resultFuture.completed(new InternalAsyncClientEndpoint(poolEntryHolder));
+ resultFuture.completed(endpoint);
} else {
final SessionRequest sessionRequest = requestSession(host, timeout, new SessionRequestCallback() {
@@ -117,8 +135,9 @@ public class HttpAsyncRequester extends
session.getLocalAddress(),
session.getRemoteAddress());
}
+ session.setSocketTimeout(timeout.toMillisIntBound());
poolEntry.assignConnection(session);
- resultFuture.completed(new InternalAsyncClientEndpoint(poolEntryHolder));
+ resultFuture.completed(endpoint);
}
@Override
@@ -126,7 +145,7 @@ public class HttpAsyncRequester extends
try {
resultFuture.failed(request.getException());
} finally {
- poolEntryHolder.abortConnection();
+ endpoint.releaseAndDiscard();
}
}
@@ -135,7 +154,7 @@ public class HttpAsyncRequester extends
try {
resultFuture.failed(new SocketTimeoutException("Connect timeout"));
} finally {
- poolEntryHolder.abortConnection();
+ endpoint.releaseAndDiscard();
}
}
@@ -144,7 +163,7 @@ public class HttpAsyncRequester extends
try {
resultFuture.cancel();
} finally {
- poolEntryHolder.abortConnection();
+ endpoint.releaseAndDiscard();
}
}
@@ -172,31 +191,197 @@ public class HttpAsyncRequester extends
return connect(host, timeout, null);
}
- private static class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+ public void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final TimeValue timeout,
+ final HttpContext context) {
+ Args.notNull(exchangeHandler, "Exchange handler");
+ Args.notNull(timeout, "Timeout");
+ Args.notNull(context, "Context");
+ try {
+ exchangeHandler.produceRequest(new RequestChannel() {
+
+ @Override
+ public void sendRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ final String scheme = request.getScheme();
+ final URIAuthority authority = request.getAuthority();
+ if (authority == null) {
+ throw new ProtocolException("Request authority not specified");
+ }
+ final HttpHost target = new HttpHost(authority, scheme);
+ connect(target, timeout, new FutureCallback<AsyncClientEndpoint>() {
+
+ @Override
+ public void completed(final AsyncClientEndpoint endpoint) {
+ endpoint.execute(new AsyncClientExchangeHandler() {
+
+ @Override
+ public void releaseResources() {
+ endpoint.releaseAndDiscard();
+ exchangeHandler.releaseResources();
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ endpoint.releaseAndDiscard();
+ exchangeHandler.failed(cause);
+ }
+
+ @Override
+ public void cancel() {
+ endpoint.releaseAndDiscard();
+ exchangeHandler.cancel();
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+ channel.sendRequest(request, entityDetails);
+ }
+
+ @Override
+ public int available() {
+ return exchangeHandler.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ exchangeHandler.produce(channel);
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+ exchangeHandler.consumeInformation(response);
+ }
+
+ @Override
+ public void consumeResponse(
+ final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+ if (entityDetails == null) {
+ endpoint.releaseAndReuse();
+ }
+ exchangeHandler.consumeResponse(response, entityDetails);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ exchangeHandler.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return exchangeHandler.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ endpoint.releaseAndReuse();
+ exchangeHandler.streamEnd(trailers);
+ }
+
+ }, context);
+
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ exchangeHandler.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ exchangeHandler.cancel();
+ }
+
+ });
+
+ }
- final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder;
+ });
- InternalAsyncClientEndpoint(final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder) {
- this.poolEntryHolder = poolEntryHolder;
+ } catch (IOException | HttpException ex) {
+ exchangeHandler.failed(ex);
+ }
+ }
+
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final TimeValue timeout,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ Args.notNull(requestProducer, "Request producer");
+ Args.notNull(responseConsumer, "Response consumer");
+ Args.notNull(timeout, "Timeout");
+ final BasicFuture<T> future = new BasicFuture<>(callback);
+ final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
+
+ @Override
+ public void completed(final T result) {
+ future.completed(result);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ future.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ future.cancel();
+ }
+
+ });
+ execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+ return future;
+ }
+
+ public final <T> Future<T> execute(
+ final AsyncRequestProducer requestProducer,
+ final AsyncResponseConsumer<T> responseConsumer,
+ final TimeValue timeout,
+ final FutureCallback<T> callback) {
+ return execute(requestProducer, responseConsumer, timeout, null, callback);
+ }
+
+ private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+
+ final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
+
+ InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
+ this.poolEntryRef = new AtomicReference<>(poolEntry);
}
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
- final IOSession connection = poolEntryHolder.getConnection();
- if (connection == null) {
+ final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
+ if (poolEntry == null) {
throw new IllegalStateException("Endpoint has already been released");
}
- connection.addLast(new ExecutionCommand(exchangeHandler, context));
+ final IOSession ioSession = poolEntry.getConnection();
+ if (ioSession == null) {
+ throw new IllegalStateException("I/O session is invalid");
+ }
+ ioSession.addLast(new ExecutionCommand(exchangeHandler, context));
}
@Override
public void releaseAndReuse() {
- poolEntryHolder.releaseConnection();
+ final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
+ if (poolEntry != null) {
+ final IOSession ioSession = poolEntry.getConnection();
+ connPool.release(poolEntry, ioSession != null && !ioSession.isClosed()); ;
+ }
}
@Override
public void releaseAndDiscard() {
- poolEntryHolder.abortConnection();
+ final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
+ if (poolEntry != null) {
+ poolEntry.discardConnection(ShutdownType.IMMEDIATE);
+ connPool.release(poolEntry, false); ;
+ }
}
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java Fri Apr 14 11:13:28 2017
@@ -37,6 +37,7 @@ import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSocketFactory;
@@ -185,7 +186,7 @@ public class HttpRequester implements Gr
} catch (final TimeoutException ex) {
throw new ConnectionRequestTimeoutException("Connection request timeout");
}
- final PoolEntryHolder<HttpHost, HttpClientConnection> connectionHolder = new PoolEntryHolder<>(connPool, poolEntry);
+ final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
try {
HttpClientConnection connection = poolEntry.getConnection();
if (connection == null) {
@@ -200,11 +201,8 @@ public class HttpRequester implements Gr
response.setEntity(new HttpEntityWrapper(entity) {
private void releaseConnection() throws IOException {
- if (connectionHolder.isReleased()) {
- return;
- }
try {
- final HttpClientConnection localConn = poolEntry.getConnection();
+ final HttpClientConnection localConn = connectionHolder.getConnection();
if (localConn != null) {
if (requestExecutor.keepAlive(request, response, localConn, context)) {
if (super.isStreaming()) {
@@ -213,16 +211,16 @@ public class HttpRequester implements Gr
content.close();
}
}
- connectionHolder.markReusable();
+ connectionHolder.releaseConnection();
}
}
} finally {
- connectionHolder.releaseConnection();
+ connectionHolder.discardConnection();
}
}
private void abortConnection() {
- connectionHolder.releaseConnection();
+ connectionHolder.discardConnection();
}
@Override
@@ -276,7 +274,7 @@ public class HttpRequester implements Gr
}
return response;
} catch (HttpException | IOException | RuntimeException ex) {
- connectionHolder.abortConnection();
+ connectionHolder.discardConnection();
throw ex;
}
}
@@ -308,4 +306,35 @@ public class HttpRequester implements Gr
connPool.close();
}
+ private class PoolEntryHolder {
+
+ private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
+
+ PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
+ this.poolEntryRef = new AtomicReference<>(poolEntry);
+ }
+
+ HttpClientConnection getConnection() {
+ final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
+ return poolEntry != null ? poolEntry.getConnection() : null;
+ }
+
+ void releaseConnection() {
+ final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
+ if (poolEntry != null) {
+ final HttpClientConnection connection = poolEntry.getConnection();
+ connPool.release(poolEntry, connection != null && connection.isOpen());
+ }
+ }
+
+ void discardConnection() {
+ final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
+ if (poolEntry != null) {
+ poolEntry.discardConnection(ShutdownType.IMMEDIATE);
+ connPool.release(poolEntry, false);
+ }
+ }
+
+ }
+
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -57,6 +57,7 @@ import org.apache.hc.core5.http.impl.Bas
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.impl.CharCodingSupport;
import org.apache.hc.core5.http.impl.ConnectionListener;
+import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.ContentDecoder;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
@@ -92,6 +93,7 @@ abstract class AbstractHttp1StreamDuplex
private final BasicHttpConnectionMetrics connMetrics;
private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
+ private final ByteBuffer contentBuffer;
private final ConnectionListener connectionListener;
private final Lock outputLock;
private final AtomicInteger outputRequests;
@@ -122,13 +124,24 @@ abstract class AbstractHttp1StreamDuplex
this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
this.incomingMessageParser = incomingMessageParser;
this.outgoingMessageWriter = outgoingMessageWriter;
+ this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
this.connectionListener = connectionListener;
this.outputLock = new ReentrantLock();
this.outputRequests = new AtomicInteger(0);
this.connState = ConnectionState.READY;
}
- void doTerminate(final Exception exception) throws IOException {
+ void shutdownSession(final ShutdownType shutdownType) {
+ if (shutdownType == ShutdownType.GRACEFUL) {
+ connState = ConnectionState.GRACEFUL_SHUTDOWN;
+ ioSession.addLast(new ShutdownCommand(ShutdownType.GRACEFUL));
+ } else {
+ connState = ConnectionState.SHUTDOWN;
+ ioSession.close();
+ }
+ }
+
+ void shutdownSession(final Exception exception) throws IOException {
connState = ConnectionState.SHUTDOWN;
try {
terminate(exception);
@@ -137,6 +150,8 @@ abstract class AbstractHttp1StreamDuplex
}
}
+ abstract void disconnected();
+
abstract void terminate(final Exception exception);
abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
@@ -157,7 +172,11 @@ abstract class AbstractHttp1StreamDuplex
SessionOutputBuffer buffer,
BasicHttpTransportMetrics metrics) throws HttpException;
- abstract int consumeData(ContentDecoder contentDecoder) throws HttpException, IOException;
+ abstract int consumeData(ByteBuffer src) throws HttpException, IOException;
+
+ abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
+
+ abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
abstract boolean isOutputReady();
@@ -238,7 +257,11 @@ abstract class AbstractHttp1StreamDuplex
break;
} else {
inputEnd();
- ioSession.setEvent(SelectionKey.OP_READ);
+ if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
+ ioSession.setEvent(SelectionKey.OP_READ);
+ } else {
+ break;
+ }
}
}
} while (bytesRead > 0);
@@ -247,7 +270,7 @@ abstract class AbstractHttp1StreamDuplex
if (outputIdle() && inputIdle()) {
requestShutdown(ShutdownType.IMMEDIATE);
} else {
- doTerminate(new ConnectionClosedException("Connection closed by peer"));
+ shutdownSession(new ConnectionClosedException("Connection closed by peer"));
}
return;
}
@@ -255,14 +278,37 @@ abstract class AbstractHttp1StreamDuplex
if (incomingMessage != null) {
final ContentDecoder contentDecoder = incomingMessage.getBody();
- final int bytesRead = consumeData(contentDecoder);
- if (bytesRead > 0) {
- totalBytesRead += bytesRead;
+
+ int bytesRead;
+ while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ }
+ contentBuffer.flip();
+ final int capacity = consumeData(contentBuffer);
+ contentBuffer.clear();
+ if (capacity <= 0) {
+ if (!contentDecoder.isCompleted()) {
+ ioSession.clearEvent(SelectionKey.OP_READ);
+ updateCapacity(new CapacityChannel() {
+
+ @Override
+ public void update(final int increment) throws IOException {
+ if (increment > 0) {
+ requestSessionInput();
+ }
+ }
+
+ });
+ }
+ break;
+ }
}
if (contentDecoder.isCompleted()) {
+ dataEnd(contentDecoder.getTrailers());
incomingMessage = null;
- inputEnd();
ioSession.setEvent(SelectionKey.OP_READ);
+ inputEnd();
}
}
if (totalBytesRead == 0 && messagesReceived == 0) {
@@ -327,7 +373,7 @@ abstract class AbstractHttp1StreamDuplex
public final void onTimeout() throws IOException, HttpException {
if (!handleTimeout()) {
- doTerminate(new SocketTimeoutException());
+ shutdownSession(new SocketTimeoutException());
}
}
@@ -336,7 +382,7 @@ abstract class AbstractHttp1StreamDuplex
connectionListener.onError(this, ex);
}
try {
- doTerminate(ex);
+ shutdownSession(ex);
} catch (IOException ex2) {
if (connectionListener != null) {
connectionListener.onError(this, ex2);
@@ -346,6 +392,7 @@ abstract class AbstractHttp1StreamDuplex
public final void onDisconnect() {
cancelPendingCommands();
+ disconnected();
releaseResources();
if (connectionListener != null) {
connectionListener.onDisconnect(this);
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -53,6 +53,7 @@ import org.apache.hc.core5.http.impl.Def
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.ContentDecoder;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
@@ -60,9 +61,9 @@ import org.apache.hc.core5.http.nio.NHtt
import org.apache.hc.core5.http.nio.SessionInputBuffer;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.command.ExecutionCommand;
-import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
@@ -75,11 +76,9 @@ public class ClientHttp1StreamDuplexer e
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
private final Http1StreamListener streamListener;
- private final ByteBuffer contentBuffer;
private final Queue<ClientHttp1StreamHandler> pipeline;
private final Http1StreamChannel<HttpRequest> outputChannel;
- private volatile boolean inconsistent;
private volatile ClientHttp1StreamHandler outgoing;
private volatile ClientHttp1StreamHandler incoming;
@@ -105,11 +104,15 @@ public class ClientHttp1StreamDuplexer e
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
this.streamListener = streamListener;
- this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
this.pipeline = new ConcurrentLinkedQueue<>();
this.outputChannel = new Http1StreamChannel<HttpRequest>() {
@Override
+ public void close() {
+ shutdownSession(ShutdownType.IMMEDIATE);
+ }
+
+ @Override
public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
if (streamListener != null) {
streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
@@ -118,23 +121,6 @@ public class ClientHttp1StreamDuplexer e
}
@Override
- public void update(final int increment) throws IOException {
- if (increment > 0) {
- requestSessionInput();
- }
- }
-
- @Override
- public void suspendInput() {
- suspendSessionInput();
- }
-
- @Override
- public void requestInput() {
- requestSessionInput();
- }
-
- @Override
public void suspendOutput() {
suspendSessionOutput();
}
@@ -170,11 +156,13 @@ public class ClientHttp1StreamDuplexer e
}
@Override
- public void abortOutput() throws IOException {
+ public boolean abortGracefully() throws IOException {
final MessageDelineation messageDelineation = endOutputStream(null);
if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
- inconsistent = true;
requestShutdown(ShutdownType.GRACEFUL);
+ return false;
+ } else {
+ return true;
}
}
@@ -182,8 +170,6 @@ public class ClientHttp1StreamDuplexer e
public void activate() throws HttpException, IOException {
}
-
-
};
}
@@ -211,16 +197,46 @@ public class ClientHttp1StreamDuplexer e
void terminate(final Exception exception) {
if (incoming != null) {
incoming.failed(exception);
+ incoming.releaseResources();
incoming = null;
}
if (outgoing != null) {
outgoing.failed(exception);
+ outgoing.releaseResources();
outgoing = null;
}
for (;;) {
final ClientHttp1StreamHandler handler = pipeline.poll();
if (handler != null) {
handler.failed(exception);
+ handler.releaseResources();
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ void disconnected() {
+ if (incoming != null) {
+ if (!incoming.isCompleted()) {
+ incoming.failed(new ConnectionClosedException("Connection closed"));
+ }
+ incoming.releaseResources();
+ incoming = null;
+ }
+ if (outgoing != null) {
+ if (!outgoing.isCompleted()) {
+ outgoing.failed(new ConnectionClosedException("Connection closed"));
+ }
+ outgoing.releaseResources();
+ outgoing = null;
+ }
+ for (;;) {
+ final ClientHttp1StreamHandler handler = pipeline.poll();
+ if (handler != null) {
+ handler.failed(new ConnectionClosedException("Connection closed"));
+ handler.releaseResources();
} else {
break;
}
@@ -318,8 +334,7 @@ public class ClientHttp1StreamDuplexer e
h1Config,
connectionReuseStrategy,
exchangeHandler,
- context,
- contentBuffer);
+ context);
pipeline.add(handler);
outgoing = handler;
@@ -350,34 +365,33 @@ public class ClientHttp1StreamDuplexer e
}
@Override
- int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+ int consumeData(final ByteBuffer src) throws HttpException, IOException {
Asserts.notNull(incoming, "Response stream handler");
- return incoming.consumeData(contentDecoder);
+ return incoming.consumeData(src);
}
@Override
- void inputEnd() throws HttpException, IOException {
+ void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
Asserts.notNull(incoming, "Response stream handler");
- if (incoming.isResponseCompleted()) {
- final boolean keepAlive = !inconsistent && incoming.keepAlive();
+ incoming.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ Asserts.notNull(incoming, "Response stream handler");
+ incoming.dataEnd(trailers);
+ }
+
+ @Override
+ void inputEnd() throws HttpException, IOException {
+ if (incoming != null && incoming.isResponseFinal()) {
+ if (streamListener != null) {
+ streamListener.onExchangeComplete(this, isOpen());
+ }
if (incoming.isCompleted()) {
incoming.releaseResources();
}
incoming = null;
- if (streamListener != null) {
- streamListener.onExchangeComplete(this, keepAlive);
- }
- if (!keepAlive) {
- if (outgoing != null && outgoing.isCompleted()) {
- outgoing.releaseResources();
- outgoing = null;
- }
- if (outgoing == null && pipeline.isEmpty()) {
- requestShutdown(ShutdownType.IMMEDIATE);
- } else {
- doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
- }
- }
}
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java Fri Apr 14 11:13:28 2017
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpRequest;
@@ -47,7 +46,7 @@ import org.apache.hc.core5.http.Unsuppor
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.LazyEntityDetails;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
-import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HttpContextAware;
import org.apache.hc.core5.http.nio.RequestChannel;
@@ -64,13 +63,12 @@ class ClientHttp1StreamHandler implement
private final ConnectionReuseStrategy connectionReuseStrategy;
private final AsyncClientExchangeHandler exchangeHandler;
private final HttpCoreContext context;
- private final ByteBuffer inputBuffer;
private final AtomicBoolean requestCommitted;
private final AtomicBoolean done;
+ private volatile boolean keepAlive;
private volatile int timeout;
private volatile HttpRequest committedRequest;
- private volatile HttpResponse receivedResponse;
private volatile MessageState requestState;
private volatile MessageState responseState;
@@ -80,8 +78,7 @@ class ClientHttp1StreamHandler implement
final H1Config h1Config,
final ConnectionReuseStrategy connectionReuseStrategy,
final AsyncClientExchangeHandler exchangeHandler,
- final HttpCoreContext context,
- final ByteBuffer inputBuffer) {
+ final HttpCoreContext context) {
this.outputChannel = outputChannel;
this.internalDataChannel = new DataStreamChannel() {
@@ -113,14 +110,14 @@ class ClientHttp1StreamHandler implement
this.connectionReuseStrategy = connectionReuseStrategy;
this.exchangeHandler = exchangeHandler;
this.context = context;
- this.inputBuffer = inputBuffer;
this.requestCommitted = new AtomicBoolean(false);
this.done = new AtomicBoolean(false);
+ this.keepAlive = true;
this.requestState = MessageState.IDLE;
this.responseState = MessageState.HEADERS;
}
- boolean isResponseCompleted() {
+ boolean isResponseFinal() {
return responseState == MessageState.COMPLETE;
}
@@ -128,11 +125,6 @@ class ClientHttp1StreamHandler implement
return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
}
- boolean keepAlive() {
- return committedRequest != null && receivedResponse != null &&
- connectionReuseStrategy.keepAlive(committedRequest, receivedResponse, context);
- }
-
boolean isHeadRequest() {
return committedRequest != null && "HEAD".equalsIgnoreCase(committedRequest.getMethod());
}
@@ -224,6 +216,10 @@ class ClientHttp1StreamHandler implement
}
if (status < HttpStatus.SC_SUCCESS) {
exchangeHandler.consumeInformation(response);
+ } else {
+ if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
+ keepAlive = false;
+ }
}
if (requestState == MessageState.ACK) {
if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
@@ -238,54 +234,49 @@ class ClientHttp1StreamHandler implement
return;
}
if (requestState == MessageState.BODY) {
- boolean keepAlive = status < HttpStatus.SC_CLIENT_ERROR;
- if (keepAlive) {
- final Header h = response.getFirstHeader(HttpHeaders.CONNECTION);
- if (h != null && HeaderElements.CLOSE.equalsIgnoreCase(h.getValue())) {
+ if (keepAlive && status >= HttpStatus.SC_CLIENT_ERROR) {
+ requestState = MessageState.COMPLETE;
+ if (!outputChannel.abortGracefully()) {
keepAlive = false;
}
}
- if (!keepAlive) {
- requestState = MessageState.COMPLETE;
- outputChannel.abortOutput();
- }
}
final EntityDetails entityDetails = endStream ? null : new LazyEntityDetails(response);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, entityDetails, context);
- receivedResponse = response;
exchangeHandler.consumeResponse(response, entityDetails);
- responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
+ if (endStream) {
+ if (!keepAlive) {
+ outputChannel.close();
+ }
+ responseState = MessageState.COMPLETE;
+ } else {
+ responseState = MessageState.BODY;
+ }
}
- int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+ int consumeData(final ByteBuffer src) throws HttpException, IOException {
if (done.get() || responseState != MessageState.BODY) {
throw new ProtocolException("Unexpected message data");
}
- int total = 0;
- int byteRead;
- while ((byteRead = contentDecoder.read(inputBuffer)) > 0) {
- total += byteRead;
- inputBuffer.flip();
- final int capacity = exchangeHandler.consume(inputBuffer);
- inputBuffer.clear();
- if (capacity <= 0) {
- if (!contentDecoder.isCompleted()) {
- outputChannel.suspendInput();
- exchangeHandler.updateCapacity(outputChannel);
- }
- break;
- }
+ return exchangeHandler.consume(src);
+ }
+
+ void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ exchangeHandler.updateCapacity(capacityChannel);
+ }
+
+ void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ if (done.get() || responseState != MessageState.BODY) {
+ throw new ProtocolException("Unexpected message data");
}
- if (contentDecoder.isCompleted()) {
- responseState = MessageState.COMPLETE;
- exchangeHandler.streamEnd(contentDecoder.getTrailers());
- return total > 0 ? total : -1;
- } else {
- return total;
+ if (!keepAlive) {
+ outputChannel.close();
}
+ responseState = MessageState.COMPLETE;
+ exchangeHandler.streamEnd(trailers);
}
boolean handleTimeout() {
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java Fri Apr 14 11:13:28 2017
@@ -30,24 +30,21 @@ import java.io.IOException;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpMessage;
-import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.ContentEncoder;
-interface Http1StreamChannel<OutgoingMessage extends HttpMessage> extends ContentEncoder, CapacityChannel {
+interface Http1StreamChannel<OutgoingMessage extends HttpMessage> extends ContentEncoder {
+
+ void close();
void activate() throws HttpException, IOException;
void submit(OutgoingMessage messageHead, boolean endStream) throws HttpException, IOException;
- void suspendInput();
-
- void requestInput();
-
void requestOutput();
void suspendOutput();
- void abortOutput() throws IOException;
+ boolean abortGracefully() throws IOException;
int getSocketTimeout();
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -51,6 +51,7 @@ import org.apache.hc.core5.http.impl.Def
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.ContentDecoder;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.HandlerFactory;
@@ -59,9 +60,9 @@ import org.apache.hc.core5.http.nio.NHtt
import org.apache.hc.core5.http.nio.SessionInputBuffer;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.command.ExecutionCommand;
-import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
@@ -75,11 +76,9 @@ public class ServerHttp1StreamDuplexer e
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
private final Http1StreamListener streamListener;
- private final ByteBuffer contentBuffer;
private final Queue<ServerHttp1StreamHandler> pipeline;
private final Http1StreamChannel<HttpResponse> outputChannel;
- private volatile boolean inconsistent;
private volatile ServerHttp1StreamHandler outgoing;
private volatile ServerHttp1StreamHandler incoming;
@@ -107,11 +106,15 @@ public class ServerHttp1StreamDuplexer e
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
this.streamListener = streamListener;
- this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
this.pipeline = new ConcurrentLinkedQueue<>();
this.outputChannel = new Http1StreamChannel<HttpResponse>() {
@Override
+ public void close() {
+ shutdown(ShutdownType.IMMEDIATE);
+ }
+
+ @Override
public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
if (streamListener != null) {
streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
@@ -120,23 +123,6 @@ public class ServerHttp1StreamDuplexer e
}
@Override
- public void update(final int increment) throws IOException {
- if (increment > 0) {
- requestSessionInput();
- }
- }
-
- @Override
- public void suspendInput() {
- suspendSessionInput();
- }
-
- @Override
- public void requestInput() {
- requestSessionInput();
- }
-
- @Override
public void requestOutput() {
requestSessionOutput();
}
@@ -172,11 +158,9 @@ public class ServerHttp1StreamDuplexer e
}
@Override
- public void abortOutput() throws IOException {
+ public boolean abortGracefully() throws IOException {
final MessageDelineation messageDelineation = endOutputStream(null);
- if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
- inconsistent = true;
- }
+ return messageDelineation != MessageDelineation.MESSAGE_HEAD;
}
@Override
@@ -210,16 +194,46 @@ public class ServerHttp1StreamDuplexer e
void terminate(final Exception exception) {
if (incoming != null) {
incoming.failed(exception);
+ incoming.releaseResources();
incoming = null;
}
if (outgoing != null) {
outgoing.failed(exception);
+ outgoing.releaseResources();
outgoing = null;
}
for (;;) {
final ServerHttp1StreamHandler handler = pipeline.poll();
if (handler != null) {
handler.failed(exception);
+ handler.releaseResources();
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ void disconnected() {
+ if (incoming != null) {
+ if (!incoming.isCompleted()) {
+ incoming.failed(new ConnectionClosedException("Connection closed"));
+ }
+ incoming.releaseResources();
+ incoming = null;
+ }
+ if (outgoing != null) {
+ if (!outgoing.isCompleted()) {
+ outgoing.failed(new ConnectionClosedException("Connection closed"));
+ }
+ outgoing.releaseResources();
+ outgoing = null;
+ }
+ for (;;) {
+ final ServerHttp1StreamHandler handler = pipeline.poll();
+ if (handler != null) {
+ handler.failed(new ConnectionClosedException("Connection closed"));
+ handler.releaseResources();
} else {
break;
}
@@ -295,8 +309,7 @@ public class ServerHttp1StreamDuplexer e
httpProcessor,
connectionReuseStrategy,
exchangeHandlerFactory,
- context,
- contentBuffer);
+ context);
outgoing = streamHandler;
} else {
streamHandler = new ServerHttp1StreamHandler(
@@ -304,8 +317,7 @@ public class ServerHttp1StreamDuplexer e
httpProcessor,
connectionReuseStrategy,
exchangeHandlerFactory,
- context,
- contentBuffer);
+ context);
pipeline.add(streamHandler);
}
streamHandler.consumeHeader(request, endStream);
@@ -313,9 +325,21 @@ public class ServerHttp1StreamDuplexer e
}
@Override
- int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+ int consumeData(final ByteBuffer src) throws HttpException, IOException {
+ Asserts.notNull(incoming, "Request stream handler");
+ return incoming.consumeData(src);
+ }
+
+ @Override
+ void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
Asserts.notNull(incoming, "Request stream handler");
- return incoming.consumeData(contentDecoder);
+ incoming.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ Asserts.notNull(incoming, "Request stream handler");
+ incoming.dataEnd(trailers);
}
@Override
@@ -346,24 +370,16 @@ public class ServerHttp1StreamDuplexer e
@Override
void outputEnd() throws HttpException, IOException {
- if (outgoing != null && outgoing.isResponseCompleted()) {
- final boolean keepAlive = !inconsistent && outgoing.keepAlive();
+ if (outgoing != null && outgoing.isResponseFinal()) {
+ if (streamListener != null) {
+ streamListener.onExchangeComplete(this, isOpen());
+ }
if (outgoing.isCompleted()) {
outgoing.releaseResources();
}
outgoing = null;
- if (streamListener != null) {
- streamListener.onExchangeComplete(this, keepAlive);
- }
- if (!keepAlive) {
- if (incoming == null && pipeline.isEmpty()) {
- requestShutdown(ShutdownType.IMMEDIATE);
- } else {
- doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
- }
- }
}
- if (outgoing == null) {
+ if (outgoing == null && isOpen()) {
final ServerHttp1StreamHandler handler = pipeline.poll();
if (handler != null) {
outgoing = handler;
@@ -393,6 +409,11 @@ public class ServerHttp1StreamDuplexer e
}
@Override
+ public void close() {
+ channel.close();
+ }
+
+ @Override
public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
synchronized (this) {
if (direct) {
@@ -405,23 +426,6 @@ public class ServerHttp1StreamDuplexer e
}
@Override
- public void update(final int increment) throws IOException {
- if (increment > 0) {
- channel.requestInput();
- }
- }
-
- @Override
- public void suspendInput() {
- channel.suspendInput();
- }
-
- @Override
- public void requestInput() {
- channel.requestInput();
- }
-
- @Override
public void suspendOutput() {
channel.suspendOutput();
}
@@ -464,12 +468,13 @@ public class ServerHttp1StreamDuplexer e
}
@Override
- public void abortOutput() throws IOException {
+ public boolean abortGracefully() throws IOException {
synchronized (this) {
if (direct) {
- channel.abortOutput();
+ return channel.abortGracefully();
} else {
completed = true;
+ return true;
}
}
}