You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/10/31 17:33:33 UTC
svn commit: r1767339 [4/14] - in /httpcomponents/httpcore/trunk: ./
httpcore5-ab/src/main/java/org/apache/hc/core5/http/benchmark/
httpcore5-ab/src/test/java/org/apache/hc/core5/http/benchmark/
httpcore5-h2/src/main/java/org/apache/hc/core5/http2/boots...
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java Mon Oct 31 17:33:27 2016
@@ -44,9 +44,9 @@ import org.apache.hc.core5.http.HttpEnti
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpStatus;
-import org.apache.hc.core5.http.entity.ByteArrayEntity;
-import org.apache.hc.core5.http.entity.EntityUtils;
import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.testing.classic.ClassicTestServer;
import org.junit.After;
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java Mon Oct 31 17:33:27 2016
@@ -27,14 +27,6 @@
package org.apache.hc.core5.testing.framework;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.ProtocolVersion;
-import org.apache.hc.core5.http.entity.ContentType;
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.BODY;
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.CONTENT_TYPE;
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.HEADERS;
@@ -46,6 +38,15 @@ import static org.apache.hc.core5.testin
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.REQUEST;
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.RESPONSE;
import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.STATUS;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.io.entity.ContentType;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
Copied: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java?p2=httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java Mon Oct 31 17:33:27 2016
@@ -25,7 +25,7 @@
*
*/
-package org.apache.hc.core5.http.integration;
+package org.apache.hc.core5.testing.nio;
import java.io.IOException;
import java.net.BindException;
@@ -34,21 +34,13 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.hc.core5.http.config.ConnectionConfig;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIOEventHandlerFactory;
-import org.apache.hc.core5.http.impl.nio.HttpAsyncService;
-import org.apache.hc.core5.http.impl.nio.UriHttpAsyncRequestHandlerMapper;
-import org.apache.hc.core5.http.protocol.HttpProcessor;
-import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
-import org.apache.hc.core5.http.protocol.ResponseConnControl;
-import org.apache.hc.core5.http.protocol.ResponseContent;
-import org.apache.hc.core5.http.protocol.ResponseDate;
-import org.apache.hc.core5.http.protocol.ResponseServer;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
+import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
import org.apache.hc.core5.reactor.IOReactorStatus;
+import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.junit.After;
import org.junit.Assert;
@@ -64,21 +56,37 @@ public class TestDefaultListeningIOReact
@Before
public void setup() throws Exception {
- final HttpProcessor httpproc = new DefaultHttpProcessor(new ResponseDate(),
- new ResponseServer(),
- new ResponseContent(),
- new ResponseConnControl());
- final HttpAsyncService serviceHandler = new HttpAsyncService(
- httpproc,
- new UriHttpAsyncRequestHandlerMapper());
- final IOEventHandlerFactory eventHandlerFactory = new DefaultHttpServerIOEventHandlerFactory(
- serviceHandler,
- ConnectionConfig.DEFAULT);
-
final IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setIoThreadCount(1)
.build();
- this.ioreactor = new DefaultListeningIOReactor(eventHandlerFactory, reactorConfig);
+ this.ioreactor = new DefaultListeningIOReactor(new IOEventHandlerFactory() {
+
+ @Override
+ public IOEventHandler createHandler(final IOSession ioSession) {
+ return new IOEventHandler() {
+
+ @Override
+ public void connected(final IOSession session) {
+ }
+
+ @Override
+ public void inputReady(final IOSession session) {
+ }
+
+ @Override
+ public void outputReady(final IOSession session) {
+ }
+
+ @Override
+ public void timeout(final IOSession session) {
+ }
+
+ @Override
+ public void disconnected(final IOSession session) {
+ }
+ };
+ }
+ }, reactorConfig);
}
@After
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,159 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+public class EchoHandler implements AsyncServerExchangeHandler {
+
+ private volatile ByteBuffer buffer;
+ private volatile CapacityChannel inputCapacityChannel;
+ private volatile DataStreamChannel outputDataChannel;
+ private volatile boolean endStream;
+
+ public EchoHandler(final int bufferSize) {
+ this.buffer = ByteBuffer.allocate(bufferSize);
+ }
+
+ private void ensureCapacity(final int chunk) {
+ if (buffer.remaining() < chunk) {
+ final ByteBuffer oldBuffer = buffer;
+ oldBuffer.flip();
+ buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+ buffer.put(oldBuffer);
+ }
+ }
+
+ @Override
+ public void setContext(final HttpContext context) {
+ }
+
+ @Override
+ public void verify(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ExpectationChannel expectationChannel) throws HttpException, IOException {
+ expectationChannel.sendContinue();
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel) throws HttpException, IOException {
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ responseChannel.sendResponse(response, entityDetails);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.write(src);
+ }
+ }
+ if (src.hasRemaining()) {
+ ensureCapacity(src.remaining());
+ buffer.put(src);
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ return buffer.remaining();
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ if (buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ inputCapacityChannel = null;
+ } else {
+ inputCapacityChannel = capacityChannel;
+ }
+ }
+
+ @Override
+ public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+ endStream = true;
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.endStream();
+ }
+ } else {
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ }
+
+ @Override
+ public int available() {
+ return buffer.position();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ outputDataChannel = channel;
+ buffer.flip();
+ if (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ buffer.compact();
+ if (buffer.position() == 0 && endStream) {
+ channel.endStream();
+ }
+ final CapacityChannel capacityChannel = inputCapacityChannel;
+ if (capacityChannel != null && buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,1536 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.MalformedChunkCodingException;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.HttpProcessors;
+import org.apache.hc.core5.http.impl.nio.AbstractClassicServerExchangeHandler;
+import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
+import org.apache.hc.core5.http.impl.nio.ConnectionListener;
+import org.apache.hc.core5.http.impl.nio.Http1StreamListener;
+import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
+import org.apache.hc.core5.http.impl.nio.bootstrap.ClientEndpoint;
+import org.apache.hc.core5.http.impl.nio.bootstrap.ClientEndpointImpl;
+import org.apache.hc.core5.http.impl.nio.entity.AbstractClassicEntityConsumer;
+import org.apache.hc.core5.http.impl.nio.entity.AbstractClassicEntityProducer;
+import org.apache.hc.core5.http.io.entity.ContentType;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseProducer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.BasicResponseProducer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.Supplier;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.support.ResponseTrigger;
+import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.http.protocol.RequestConnControl;
+import org.apache.hc.core5.http.protocol.RequestContent;
+import org.apache.hc.core5.http.protocol.RequestValidateHost;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.SessionRequest;
+import org.apache.hc.core5.util.CharArrayBuffer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class Http1IntegrationTest extends InternalServerTestBase {
+
+ private static final long TIMEOUT = 5;
+
+ private Http1TestClient client;
+
+ @Before
+ public void setup() throws Exception {
+ client = new Http1TestClient();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if (client != null) {
+ client.shutdown(3, TimeUnit.SECONDS);
+ }
+ }
+
+ private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
+ try {
+ return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
+ } catch (URISyntaxException e) {
+ throw new IllegalStateException();
+ }
+ }
+
+ @Test
+ public void testSimpleGet() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi there", entity1);
+ }
+ }
+
+ @Test
+ public void testSimpleGetIdentityTransfer() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
+ final InetSocketAddress serverEndpoint = server.start(httpProcessor, H1Config.DEFAULT);
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ final String entity = result.getBody();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ Assert.assertEquals("Hi there", entity);
+ }
+
+ @Test
+ public void testSimpleGetsPipelined() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ final String entity = result.getBody();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ Assert.assertEquals("Hi there", entity);
+ }
+ }
+
+ @Test
+ public void testLargeGet() throws Exception {
+ server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new MultiLineResponseHandler("0123456789abcdef", 5000);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ final String s1 = result1.getBody();
+ Assert.assertNotNull(s1);
+ final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+ while (t1.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t1.nextToken());
+ }
+
+ final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
+
+ final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result2);
+ final HttpResponse response2 = result2.getHead();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(200, response2.getCode());
+ final String s2 = result2.getBody();
+ Assert.assertNotNull(s2);
+ final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
+ while (t2.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t2.nextToken());
+ }
+ }
+
+ @Test
+ public void testLargeGetsPipelined() throws Exception {
+ server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new MultiLineResponseHandler("0123456789abcdef", 2000);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ final String entity = result.getBody();
+ Assert.assertNotNull(entity);
+ final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+ while (t.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t.nextToken());
+ }
+ }
+ }
+
+ @Test
+ public void testBasicPost() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi back", entity1);
+ }
+ }
+
+ @Test
+ public void testBasicPostPipelined() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ final String entity = result.getBody();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ Assert.assertEquals("Hi back", entity);
+ }
+ }
+
+ @Test
+ public void testHttp10Post() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final HttpRequest request = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+ request.setVersion(HttpVersion.HTTP_1_0);
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer(request, new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi back", entity1);
+ }
+ }
+
+ @Test
+ public void testNoEntityPost() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final HttpRequest request = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer(request, null),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi back", entity1);
+ }
+ }
+
+ @Test
+ public void testLargePost() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(2048);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/echo"),
+ new MultiLineEntityProducer("0123456789abcdef", 5000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ final String entity = result.getBody();
+ Assert.assertNotNull(entity);
+ final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+ while (t.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t.nextToken());
+ }
+ }
+ }
+
+ @Test
+ public void testPostsPipelinedLargeResponse() throws Exception {
+ server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new MultiLineResponseHandler("0123456789abcdef", 2000);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 2; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ final String entity = result.getBody();
+ Assert.assertNotNull(entity);
+ final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+ while (t.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t.nextToken());
+ }
+ }
+ }
+
+
+ @Test
+ public void testLargePostsPipelined() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(2048);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/echo"),
+ new MultiLineEntityProducer("0123456789abcdef", 5000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ final String entity = result.getBody();
+ Assert.assertNotNull(entity);
+ final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+ while (t.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t.nextToken());
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleHead() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ for (int i = 0; i < 5; i++) {
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("HEAD", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertNull(result.getBody());
+ }
+ }
+
+ @Test
+ public void testHeadPipelined() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("HEAD", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertNull(result.getBody());
+ }
+ }
+
+ @Test
+ public void testExpectationFailed() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
+
+ @Override
+ protected AsyncResponseProducer verify(
+ final HttpRequest request,
+ final HttpContext context) throws IOException, HttpException {
+ final Header h = request.getFirstHeader("password");
+ if (h != null && "secret".equals(h.getValue())) {
+ return null;
+ } else {
+ return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+ }
+ }
+
+ protected void handle(
+ final Message<HttpRequest, String> request,
+ final ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ responseTrigger.submitResponse(
+ new BasicResponseProducer(HttpStatus.SC_OK, "All is well"));
+
+ }
+ };
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final SessionRequest sessionRequest = client.requestSession(
+ new InetSocketAddress("localhost", serverEndpoint.getPort()), TIMEOUT, TimeUnit.SECONDS, null);
+ sessionRequest.waitFor();
+ final IOSession ioSession = sessionRequest.getSession();
+ final ClientEndpoint streamEndpoint = new ClientEndpointImpl(ioSession);
+
+ final IOEventHandler handler = ioSession.getHandler();
+ Assert.assertNotNull(handler);
+ Assert.assertTrue(handler instanceof HttpConnection);
+ final HttpConnection conn = (HttpConnection) handler;
+
+ final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ request1.addHeader("password", "secret");
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertNotNull("All is well", result1.getBody());
+
+ Assert.assertTrue(conn.isOpen());
+
+ final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+ new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result2 = future2.get(50000, TimeUnit.SECONDS);
+ Assert.assertNotNull(result2);
+ final HttpResponse response2 = result2.getHead();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
+ Assert.assertNotNull("You shall not pass", result2.getBody());
+
+ Assert.assertTrue(conn.isOpen());
+
+ final HttpRequest request3 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ request3.addHeader("password", "secret");
+ final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+ new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result3);
+ final HttpResponse response3 = result3.getHead();
+ Assert.assertNotNull(response3);
+ Assert.assertEquals(200, response3.getCode());
+ Assert.assertNotNull("All is well", result3.getBody());
+
+ Assert.assertTrue(conn.isOpen());
+
+ final HttpRequest request4 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
+ new BasicRequestProducer(request4, new BasicAsyncEntityProducer("blah")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result4 = future4.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result4);
+ final HttpResponse response4 = result4.getHead();
+ Assert.assertNotNull(response4);
+ Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
+ Assert.assertNotNull("You shall not pass", result4.getBody());
+
+ Assert.assertFalse(conn.isOpen());
+ }
+
+ @Test
+ public void testDelayedExpectationVerification() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new AsyncServerExchangeHandler() {
+
+ private final Random random = new Random(System.currentTimeMillis());
+ private final AsyncEntityProducer entityProducer = new BasicAsyncEntityProducer(
+ "All is well");
+
+ @Override
+ public void setContext(final HttpContext context) {
+ }
+
+ @Override
+ public void verify(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ExpectationChannel expectationChannel) throws HttpException, IOException {
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(random.nextInt(1000));
+ expectationChannel.sendContinue();
+ } catch (Exception ignore) {
+ }
+ }
+ });
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel) throws HttpException, IOException {
+ final HttpResponse response = new BasicHttpResponse(200);
+ responseChannel.sendResponse(response, entityProducer);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ capacityChannel.update(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+ }
+
+ @Override
+ public int available() {
+ return entityProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ entityProducer.produce(channel);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ };
+
+ }
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start(H1Config.custom().setWaitForContinueTimeout(100).build());
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+ for (int i = 0; i < 5; i++) {
+ queue.add(streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/"),
+ new BasicAsyncEntityProducer("Some important message")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+ }
+ while (!queue.isEmpty()) {
+ final Future<Message<HttpResponse, String>> future = queue.remove();
+ final Message<HttpResponse, String> result = future.get(5 * TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response = result.getHead();
+ Assert.assertNotNull(response);
+ Assert.assertEquals(200, response.getCode());
+ Assert.assertNotNull("All is well", result.getBody());
+ }
+ }
+
+ @Test
+ public void testPrematureResponse() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new AsyncServerExchangeHandler() {
+
+ private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
+
+ @Override
+ public void setContext(final HttpContext context) {
+ }
+
+ @Override
+ public void verify(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ExpectationChannel expectationChannel) throws HttpException, IOException {
+ expectationChannel.sendContinue();
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ capacityChannel.update(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel) throws HttpException, IOException {
+ final AsyncResponseProducer producer;
+ final Header h = request.getFirstHeader("password");
+ if (h != null && "secret".equals(h.getValue())) {
+ producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
+ } else {
+ producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+ }
+ responseProducer.set(producer);
+ responseChannel.sendResponse(producer.produceResponse(), producer.getEntityDetails());
+ }
+
+ @Override
+ public int available() {
+ final AsyncResponseProducer producer = responseProducer.get();
+ return producer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ final AsyncResponseProducer producer = responseProducer.get();
+ producer.produce(channel);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+ };
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
+ Assert.assertNotNull("You shall not pass", result1.getBody());
+ }
+
+ @Test
+ public void testSlowResponseConsumer() throws Exception {
+ server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new MultiLineResponseHandler("0123456789abcd", 100);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start(H1Config.DEFAULT, ConnectionConfig.custom().setBufferSize(256).build());
+
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final HttpRequest request1 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/"));
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, null),
+ new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
+
+ @Override
+ protected String consumeData(
+ final ContentType contentType, final InputStream inputStream) throws IOException {
+ Charset charset = contentType != null ? contentType.getCharset() : null;
+ if (charset == null) {
+ charset = StandardCharsets.US_ASCII;
+ }
+
+ final StringBuilder buffer = new StringBuilder();
+ try {
+ final byte[] tmp = new byte[16];
+ int l;
+ while ((l = inputStream.read(tmp)) != -1) {
+ buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
+ Thread.sleep(50);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ return buffer.toString();
+ }
+ }),
+ null);
+
+ final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ final String s1 = result1.getBody();
+ Assert.assertNotNull(s1);
+ final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+ while (t1.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcd", t1.nextToken());
+ }
+ }
+
+ @Test
+ public void testSlowRequestProducer() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(2048);
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
+
+ @Override
+ protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
+ Charset charset = contentType.getCharset();
+ if (charset == null) {
+ charset = StandardCharsets.US_ASCII;
+ }
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
+ for (int i = 0; i < 500; i++) {
+ if (i % 100 == 0) {
+ writer.flush();
+ Thread.sleep(500);
+ }
+ writer.write("0123456789abcdef\r\n");
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ }
+
+ }),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ final String s1 = result1.getBody();
+ Assert.assertNotNull(s1);
+ final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+ while (t1.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcdef", t1.nextToken());
+ }
+ }
+
+ @Test
+ public void testSlowResponseProducer() throws Exception {
+ server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
+
+ @Override
+ protected void handle(
+ final HttpRequest request,
+ final InputStream requestStream,
+ final HttpResponse response,
+ final OutputStream responseStream,
+ final HttpContext context) throws IOException, HttpException {
+
+ if (!"/hello".equals(request.getPath())) {
+ response.setCode(HttpStatus.SC_NOT_FOUND);
+ return;
+ }
+ if (!"POST".equalsIgnoreCase(request.getMethod())) {
+ response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
+ return;
+ }
+ if (requestStream == null) {
+ return;
+ }
+ final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+ final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
+ Charset charset = contentType != null ? contentType.getCharset() : null;
+ if (charset == null) {
+ charset = StandardCharsets.US_ASCII;
+ }
+ response.setCode(HttpStatus.SC_OK);
+ response.setHeader(h1);
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
+ final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
+ try {
+ String l;
+ int count = 0;
+ while ((l = reader.readLine()) != null) {
+ writer.write(l);
+ writer.write("\r\n");
+ count++;
+ if (count % 500 == 0) {
+ Thread.sleep(500);
+ }
+ }
+ writer.flush();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ }
+ }
+ };
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start(H1Config.DEFAULT, ConnectionConfig.custom().setBufferSize(256).build());
+
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ final String s1 = result1.getBody();
+ Assert.assertNotNull(s1);
+ final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+ while (t1.hasMoreTokens()) {
+ Assert.assertEquals("0123456789abcd", t1.nextToken());
+ }
+ }
+
+ @Test
+ public void testPipelinedConnectionClose() throws Exception {
+ server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-1"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello-2"));
+ request2.addHeader(HttpHeaders.CONNECTION, "close");
+ final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+ new BasicRequestProducer(request2,
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ final String entity1 = result1.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi back", entity1);
+
+ final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result2);
+ final HttpResponse response2 = result2.getHead();
+ final String entity2 = result2.getBody();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(200, response2.getCode());
+ Assert.assertEquals("Hi back", entity2);
+
+ try {
+ final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNull(result3);
+ Assert.assertTrue(future3.isCancelled());
+ } catch (ExecutionException ignore) {
+ }
+
+ final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result4 = future4.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNull(result4);
+ Assert.assertTrue(future4.isCancelled());
+ }
+
+ @Test
+ public void testPipelinedInvalidRequest() throws Exception {
+ server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi back");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-1"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello-2"));
+ request2.addHeader(HttpHeaders.HOST, "");
+ final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+ new BasicRequestProducer(request2,
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+ new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+ new BasicAsyncEntityProducer("Hi there")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ final String entity1 = result1.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi back", entity1);
+
+ final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result2);
+ final HttpResponse response2 = result2.getHead();
+ final String entity2 = result2.getBody();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(400, response2.getCode());
+ Assert.assertTrue(entity2.length() > 0);
+
+ try {
+ final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNull(result3);
+ Assert.assertTrue(future3.isCancelled());
+ } catch (ExecutionException ignore) {
+ }
+ }
+
+ private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
+
+ private static class BrokenChunkEncoder extends AbstractContentEncoder {
+
+ private final CharArrayBuffer lineBuffer;
+ private boolean done;
+
+ BrokenChunkEncoder(
+ final WritableByteChannel channel,
+ final SessionOutputBuffer buffer,
+ final BasicHttpTransportMetrics metrics) {
+ super(channel, buffer, metrics);
+ lineBuffer = new CharArrayBuffer(16);
+ }
+
+ @Override
+ public void complete() throws IOException {
+ super.complete();
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ final int chunk;
+ if (!done) {
+ lineBuffer.clear();
+ lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
+ buffer().writeLine(lineBuffer);
+ buffer().write(ByteBuffer.wrap(GARBAGE));
+ done = true;
+ chunk = GARBAGE.length;
+ } else {
+ chunk = 0;
+ }
+ final long bytesWritten = buffer().flush(channel());
+ if (bytesWritten > 0) {
+ metrics().incrementBytesTransferred(bytesWritten);
+ }
+ if (!buffer().hasData()) {
+ channel().close();
+ }
+ return chunk;
+ }
+
+ }
+
+ @Test
+ public void testTruncatedChunk() throws Exception {
+ final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
+ HttpProcessors.server(),
+ new HandlerFactory<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler create(final HttpRequest request) throws HttpException {
+ return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
+
+ @Override
+ protected void handle(
+ final Message<HttpRequest, String> request,
+ final ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ responseTrigger.submitResponse(
+ new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")));
+ }
+
+ };
+ }
+
+ },
+ H1Config.DEFAULT,
+ ConnectionConfig.DEFAULT,
+ DefaultConnectionReuseStrategy.INSTANCE) {
+
+ @Override
+ protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
+ final IOSession ioSession,
+ final HttpProcessor httpProcessor,
+ final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
+ final H1Config h1Config,
+ final ConnectionConfig connectionConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParser<HttpRequest> incomingMessageParser,
+ final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final ConnectionListener connectionListener,
+ final Http1StreamListener streamListener) {
+ return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
+ h1Config, connectionConfig, connectionReuseStrategy,
+ incomingMessageParser, outgoingMessageWriter,
+ incomingContentStrategy, outgoingContentStrategy,
+ connectionListener, streamListener) {
+
+ @Override
+ protected ContentEncoder handleOutgoingMessage(
+ final HttpResponse response,
+ final WritableByteChannel channel,
+ final SessionOutputBuffer buffer,
+ final BasicHttpTransportMetrics metrics) throws HttpException {
+ final long len = outgoingContentStrategy.determineLength(response);
+ if (len == ContentLengthStrategy.CHUNKED) {
+ return new BrokenChunkEncoder(channel, buffer, metrics);
+ } else {
+ return super.handleOutgoingMessage(response, channel, buffer, metrics);
+ }
+ }
+
+ };
+ }
+
+ });
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final AsyncRequestProducer requestProducer = new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello"));
+ final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer());
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
+ try {
+ future1.get();
+ Assert.fail("ExecutionException should have been thrown");
+ } catch (final ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ Assert.assertTrue(cause instanceof MalformedChunkCodingException);
+ Assert.assertTrue(responseConsumer.getException() instanceof MalformedChunkCodingException);
+ Assert.assertEquals("garbage", responseConsumer.getResponseContent());
+ }
+ }
+
+ @Test
+ public void testExceptionInHandler() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there") {
+
+ @Override
+ protected void handle(
+ final Message<HttpRequest, String> request,
+ final ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ throw new HttpException("Boom");
+ }
+ };
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(500, response1.getCode());
+ Assert.assertEquals("Boom", entity1);
+ }
+
+ @Test
+ public void testNoServiceHandler() throws Exception {
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ final String entity1 = result.getBody();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(404, response1.getCode());
+ Assert.assertEquals("Resource not found", entity1);
+ }
+
+ @Test
+ public void testResponseNoContent() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there") {
+
+ @Override
+ protected void handle(
+ final Message<HttpRequest, String> request,
+ final ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
+ responseTrigger.submitResponse(new BasicResponseProducer(response));
+ }
+ };
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start();
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+ new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result);
+ final HttpResponse response1 = result.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(204, response1.getCode());
+ Assert.assertNull(result.getBody());
+ }
+
+ @Test
+ public void testAbsentHostHeader() throws Exception {
+ server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new SingleLineResponseHandler("Hi there");
+ }
+
+ });
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()),
+ H1Config.DEFAULT, ConnectionConfig.DEFAULT);
+
+ final Future<ClientEndpoint> connectFuture = client.connect(
+ "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+ final ClientEndpoint streamEndpoint = connectFuture.get();
+
+ final HttpRequest request1 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/hello"));
+ request1.setVersion(HttpVersion.HTTP_1_0);
+ final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+ new BasicRequestProducer(request1, null),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result1);
+ final HttpResponse response1 = result1.getHead();
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(200, response1.getCode());
+ Assert.assertEquals("Hi there", result1.getBody());
+
+ final HttpRequest request2 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/hello"));
+ request2.setVersion(HttpVersion.HTTP_1_1);
+ final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+ new BasicRequestProducer(request2, null),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+ Assert.assertNotNull(result2);
+ final HttpResponse response2 = result2.getHead();
+ Assert.assertNotNull(response2);
+ Assert.assertEquals(400, response2.getCode());
+ Assert.assertEquals("Host header is absent", result2.getBody());
+ }
+
+}