You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/09/07 20:12:13 UTC
svn commit: r1166291 - in /httpcomponents/httpcore/trunk/httpcore-nio/src:
examples/org/apache/http/examples/nio/
main/java/org/apache/http/nio/protocol/
Author: olegk
Date: Wed Sep 7 18:12:13 2011
New Revision: 1166291
URL: http://svn.apache.org/viewvc?rev=1166291&view=rev
Log:
Updated the reverse proxy example; fixed a minor bug in HttpAsyncServiceHandler
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java Wed Sep 7 18:12:13 2011
@@ -29,47 +29,65 @@ package org.apache.http.examples.nio;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.ConnectionReuseStrategy;
-import org.apache.http.HttpConnection;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseFactory;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
-import org.apache.http.ProtocolVersion;
+import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
-import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.impl.EnglishReasonPhraseCatalog;
import org.apache.http.impl.nio.DefaultClientIODispatch;
import org.apache.http.impl.nio.DefaultServerIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
-import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpServerConnection;
-import org.apache.http.nio.NHttpServiceHandler;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.nio.pool.NIOConnFactory;
+import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
+import org.apache.http.nio.protocol.HttpAsyncClientProtocolHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
+import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncResponseProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseTrigger;
+import org.apache.http.nio.protocol.HttpAsyncServiceHandler;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.params.DefaultedHttpParams;
-import org.apache.http.params.HttpParams;
import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.protocol.HttpContext;
+import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
@@ -83,35 +101,33 @@ import org.apache.http.protocol.Response
import org.apache.http.protocol.ResponseServer;
/**
- * Rudimentary HTTP/1.1 reverse proxy based on the non-blocking I/O model.
- * <p>
- * Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
- * It is NOT intended to demonstrate the most efficient way of building an HTTP reverse proxy.
- *
- *
+ * Elemental HTTP/1.1 reverse proxy based on the non-blocking I/O model.
*/
public class NHttpReverseProxy {
public static void main(String[] args) throws Exception {
-
if (args.length < 1) {
System.out.println("Usage: NHttpReverseProxy <hostname> [port]");
System.exit(1);
}
- String hostname = args[0];
- int port = 80;
+ URI uri = new URI(args[0]);
+ int port = 8080;
if (args.length > 1) {
port = Integer.parseInt(args[1]);
}
// Target host
- HttpHost targetHost = new HttpHost(hostname, port);
+ HttpHost targetHost = new HttpHost(
+ uri.getHost(),
+ uri.getPort() > 0 ? uri.getPort() : 80,
+ uri.getScheme() != null ? uri.getScheme() : "http");
+
+ System.out.println("Reverse proxy to " + targetHost);
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
- .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1")
.setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");
@@ -123,41 +139,43 @@ public class NHttpReverseProxy {
// Set up HTTP protocol processor for incoming connections
HttpProcessor inhttpproc = new ImmutableHttpProcessor(
+ new HttpResponseInterceptor[] {
+ new ResponseDate(),
+ new ResponseServer(),
+ new ResponseContent(),
+ new ResponseConnControl()
+ });
+
+ // Set up HTTP protocol processor for outgoing connections
+ HttpProcessor outhttpproc = new ImmutableHttpProcessor(
new HttpRequestInterceptor[] {
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()
- });
-
- // Set up HTTP protocol processor for outgoing connections
- HttpProcessor outhttpproc = new ImmutableHttpProcessor(
- new HttpResponseInterceptor[] {
- new ResponseDate(),
- new ResponseServer(),
- new ResponseContent(),
- new ResponseConnControl()
});
- NHttpClientHandler connectingHandler = new ConnectingHandler(
- inhttpproc,
- new DefaultConnectionReuseStrategy(),
- params);
-
- NHttpServiceHandler listeningHandler = new ListeningHandler(
- targetHost,
- connectingIOReactor,
- outhttpproc,
- new DefaultHttpResponseFactory(),
- new DefaultConnectionReuseStrategy(),
- params);
+ ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();
+ HttpAsyncRequestExecutor executor = new HttpAsyncRequestExecutor(
+ outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params);
+
+ ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor,
+ new BasicNIOConnFactory(new LoggingClientConnectionFactory(params)), params);
+ connPool.setMaxTotal(100);
+ connPool.setDefaultMaxPerRoute(20);
+
+ HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry();
+ handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));
+
+ ProxyServiceHandler serviceHandler = new ProxyServiceHandler(
+ handlerRegistry, inhttpproc, new ProxyIncomingConnectionReuseStrategy(), params);
final IOEventDispatch connectingEventDispatch = new DefaultClientIODispatch(
- connectingHandler, params);
+ clientHandler, new LoggingClientConnectionFactory(params));
final IOEventDispatch listeningEventDispatch = new DefaultServerIODispatch(
- listeningHandler, params);
+ serviceHandler, new LoggingServerConnectionFactory(params));
Thread t = new Thread(new Runnable() {
@@ -166,841 +184,654 @@ public class NHttpReverseProxy {
connectingIOReactor.execute(connectingEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
- } catch (IOException e) {
- System.err.println("I/O error: " + e.getMessage());
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ listeningIOReactor.shutdown();
+ } catch (IOException ex2) {
+ ex2.printStackTrace();
+ }
}
}
});
t.start();
-
try {
- listeningIOReactor.listen(new InetSocketAddress(8888));
+ listeningIOReactor.listen(new InetSocketAddress(port));
listeningIOReactor.execute(listeningEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
- } catch (IOException e) {
- System.err.println("I/O error: " + e.getMessage());
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ connectingIOReactor.shutdown();
+ } catch (IOException ex2) {
+ ex2.printStackTrace();
+ }
}
}
- static class ListeningHandler implements NHttpServiceHandler {
+ static class ProxyHttpExchange {
- private final HttpHost targetHost;
- private final ConnectingIOReactor connectingIOReactor;
- private final HttpProcessor httpProcessor;
- private final HttpResponseFactory responseFactory;
- private final ConnectionReuseStrategy connStrategy;
- private final HttpParams params;
-
- public ListeningHandler(
- final HttpHost targetHost,
- final ConnectingIOReactor connectingIOReactor,
- final HttpProcessor httpProcessor,
- final HttpResponseFactory responseFactory,
- final ConnectionReuseStrategy connStrategy,
- final HttpParams params) {
+ private final ByteBuffer inBuffer;
+ private final ByteBuffer outBuffer;
+
+ private volatile String id;
+ private volatile HttpHost target;
+ private volatile HttpAsyncResponseTrigger responseTrigger;
+ private volatile IOControl originIOControl;
+ private volatile IOControl clientIOControl;
+ private volatile HttpRequest request;
+ private volatile boolean requestReceived;
+ private volatile HttpResponse response;
+ private volatile boolean responseReceived;
+ private volatile Exception ex;
+
+ public ProxyHttpExchange() {
super();
- this.targetHost = targetHost;
- this.connectingIOReactor = connectingIOReactor;
- this.httpProcessor = httpProcessor;
- this.connStrategy = connStrategy;
- this.responseFactory = responseFactory;
- this.params = params;
+ this.inBuffer = ByteBuffer.allocateDirect(10240);
+ this.outBuffer = ByteBuffer.allocateDirect(10240);
}
- public void connected(final NHttpServerConnection conn) {
- System.out.println(conn + " [client->proxy] conn open");
-
- ProxyTask proxyTask = new ProxyTask();
+ public ByteBuffer getInBuffer() {
+ return this.inBuffer;
+ }
- synchronized (proxyTask) {
+ public ByteBuffer getOutBuffer() {
+ return this.outBuffer;
+ }
- // Initialize connection state
- proxyTask.setTarget(this.targetHost);
- proxyTask.setClientIOControl(conn);
- proxyTask.setClientState(ConnState.CONNECTED);
+ public String getId() {
+ return this.id;
+ }
- HttpContext context = conn.getContext();
- context.setAttribute(ProxyTask.ATTRIB, proxyTask);
+ public void setId(final String id) {
+ this.id = id;
+ }
- InetSocketAddress address = new InetSocketAddress(
- this.targetHost.getHostName(),
- this.targetHost.getPort());
+ public HttpHost getTarget() {
+ return this.target;
+ }
- this.connectingIOReactor.connect(
- address,
- null,
- proxyTask,
- null);
- }
+ public void setTarget(final HttpHost target) {
+ this.target = target;
}
- public void requestReceived(final NHttpServerConnection conn) {
- System.out.println(conn + " [client->proxy] request received");
+ public HttpRequest getRequest() {
+ return this.request;
+ }
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ public void setRequest(final HttpRequest request) {
+ this.request = request;
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getClientState();
- if (connState != ConnState.IDLE
- && connState != ConnState.CONNECTED) {
- throw new IllegalStateException("Illegal client connection state: " + connState);
- }
+ public HttpResponse getResponse() {
+ return this.response;
+ }
- try {
+ public void setResponse(final HttpResponse response) {
+ this.response = response;
+ }
- HttpRequest request = conn.getHttpRequest();
+ public HttpAsyncResponseTrigger getResponseTrigger() {
+ return this.responseTrigger;
+ }
- System.out.println(conn + " [client->proxy] >> " + request.getRequestLine());
+ public void setResponseTrigger(final HttpAsyncResponseTrigger responseTrigger) {
+ this.responseTrigger = responseTrigger;
+ }
- ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
- if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
- // Downgrade protocol version if greater than HTTP/1.1
- ver = HttpVersion.HTTP_1_1;
- }
+ public IOControl getClientIOControl() {
+ return this.clientIOControl;
+ }
- // Update connection state
- proxyTask.setRequest(request);
- proxyTask.setClientState(ConnState.REQUEST_RECEIVED);
-
- // See if the client expects a 100-Continue
- if (request instanceof HttpEntityEnclosingRequest) {
- if (((HttpEntityEnclosingRequest) request).expectContinue()) {
- HttpResponse ack = this.responseFactory.newHttpResponse(
- ver,
- HttpStatus.SC_CONTINUE,
- context);
- conn.submitResponse(ack);
- }
- } else {
- // No request content expected. Suspend client input
- conn.suspendInput();
- }
+ public void setClientIOControl(final IOControl clientIOControl) {
+ this.clientIOControl = clientIOControl;
+ }
- // If there is already a connection to the origin server
- // make sure origin output is active
- if (proxyTask.getOriginIOControl() != null) {
- proxyTask.getOriginIOControl().requestOutput();
- }
+ public IOControl getOriginIOControl() {
+ return this.originIOControl;
+ }
- } catch (IOException ex) {
- shutdownConnection(conn);
- } catch (HttpException ex) {
- shutdownConnection(conn);
- }
- }
+ public void setOriginIOControl(final IOControl originIOControl) {
+ this.originIOControl = originIOControl;
}
- public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
- System.out.println(conn + " [client->proxy] input ready");
+ public boolean isRequestReceived() {
+ return this.requestReceived;
+ }
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ public void setRequestReceived() {
+ this.requestReceived = true;
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getClientState();
- if (connState != ConnState.REQUEST_RECEIVED
- && connState != ConnState.REQUEST_BODY_STREAM) {
- throw new IllegalStateException("Illegal client connection state: " + connState);
- }
+ public boolean isResponseReceived() {
+ return this.responseReceived;
+ }
- try {
+ public void setResponseReceived() {
+ this.responseReceived = true;
+ }
- ByteBuffer dst = proxyTask.getInBuffer();
- int bytesRead = decoder.read(dst);
- System.out.println(conn + " [client->proxy] " + bytesRead + " bytes read");
- System.out.println(conn + " [client->proxy] " + decoder);
- if (!dst.hasRemaining()) {
- // Input buffer is full. Suspend client input
- // until the origin handler frees up some space in the buffer
- conn.suspendInput();
- }
- // If there is some content in the input buffer make sure origin
- // output is active
- if (dst.position() > 0) {
- if (proxyTask.getOriginIOControl() != null) {
- proxyTask.getOriginIOControl().requestOutput();
- }
- }
+ public Exception getException() {
+ return this.ex;
+ }
- if (decoder.isCompleted()) {
- System.out.println(conn + " [client->proxy] request body received");
- // Update connection state
- proxyTask.setClientState(ConnState.REQUEST_BODY_DONE);
- // Suspend client input
- conn.suspendInput();
- } else {
- proxyTask.setClientState(ConnState.REQUEST_BODY_STREAM);
- }
+ public void setException(final Exception ex) {
+ this.ex = ex;
+ }
- } catch (IOException ex) {
- shutdownConnection(conn);
- }
- }
+ public void reset() {
+ this.inBuffer.clear();
+ this.outBuffer.clear();
+ this.target = null;
+ this.id = null;
+ this.responseTrigger = null;
+ this.clientIOControl = null;
+ this.originIOControl = null;
+ this.request = null;
+ this.requestReceived = false;
+ this.response = null;
+ this.responseReceived = false;
+ this.ex = null;
}
- public void responseReady(final NHttpServerConnection conn) {
- System.out.println(conn + " [client<-proxy] response ready");
+ }
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> {
+
+ private final HttpHost target;
+ private final HttpAsyncRequestExecutor executor;
+ private final BasicNIOConnPool connPool;
+ private final AtomicLong counter;
+
+ public ProxyRequestHandler(
+ final HttpHost target,
+ final HttpAsyncRequestExecutor executor,
+ final BasicNIOConnPool connPool) {
+ super();
+ this.target = target;
+ this.executor = executor;
+ this.connPool = connPool;
+ this.counter = new AtomicLong(1);
+ }
+
+ public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest(
+ final HttpRequest request,
+ final HttpContext context) {
+ ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange");
+ if (httpExchange == null) {
+ httpExchange = new ProxyHttpExchange();
+ context.setAttribute("http-exchange", httpExchange);
+ }
+ synchronized (httpExchange) {
+ httpExchange.reset();
+ String id = String.format("%08X", this.counter.getAndIncrement());
+ httpExchange.setId(id);
+ httpExchange.setTarget(this.target);
+ return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool);
+ }
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getClientState();
- if (connState == ConnState.IDLE) {
- // Response not available
+ public void handle(
+ final ProxyHttpExchange httpExchange,
+ final HttpAsyncResponseTrigger responseTrigger,
+ final HttpContext context) throws HttpException, IOException {
+ synchronized (httpExchange) {
+ Exception ex = httpExchange.getException();
+ if (ex != null) {
+ System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex);
+ int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
+ HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
+ EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
+ String message = ex.getMessage();
+ if (message == null) {
+ message = "Unexpected error";
+ }
+ response.setEntity(NStringEntity.create(message, ContentType.DEFAULT_TEXT));
+ responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
+ System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered");
return;
}
- if (connState != ConnState.REQUEST_RECEIVED
- && connState != ConnState.REQUEST_BODY_DONE) {
- throw new IllegalStateException("Illegal client connection state: " + connState);
- }
-
- try {
-
- HttpRequest request = proxyTask.getRequest();
- HttpResponse response = proxyTask.getResponse();
- if (response == null) {
- throw new IllegalStateException("HTTP request is null");
- }
- // Remove hop-by-hop headers
- response.removeHeaders(HTTP.CONTENT_LEN);
- response.removeHeaders(HTTP.TRANSFER_ENCODING);
- response.removeHeaders(HTTP.CONN_DIRECTIVE);
- response.removeHeaders("Keep-Alive");
- response.removeHeaders("Proxy-Authenticate");
- response.removeHeaders("Proxy-Authorization");
- response.removeHeaders("TE");
- response.removeHeaders("Trailers");
- response.removeHeaders("Upgrade");
-
- response.setParams(
- new DefaultedHttpParams(response.getParams(), this.params));
-
- // Close client connection if the connection to the target
- // is no longer active / open
- if (proxyTask.getOriginState().compareTo(ConnState.CLOSING) >= 0) {
- response.addHeader(HTTP.CONN_DIRECTIVE, "Close");
- }
-
- // Pre-process HTTP request
- context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
- context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
- this.httpProcessor.process(response, context);
-
- conn.submitResponse(response);
-
- proxyTask.setClientState(ConnState.RESPONSE_SENT);
-
- System.out.println(conn + " [client<-proxy] << " + response.getStatusLine());
-
- if (!canResponseHaveBody(request, response)) {
- conn.resetInput();
- if (!this.connStrategy.keepAlive(response, context)) {
- System.out.println(conn + " [client<-proxy] close connection");
- proxyTask.setClientState(ConnState.CLOSING);
- conn.close();
- } else {
- // Reset connection state
- proxyTask.reset();
- conn.requestInput();
- // Ready to deal with a new request
- }
- }
-
- } catch (IOException ex) {
- shutdownConnection(conn);
- } catch (HttpException ex) {
- shutdownConnection(conn);
+ HttpResponse response = httpExchange.getResponse();
+ if (response != null) {
+ responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange));
+ System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered");
+ return;
}
+ // No response yet.
+ httpExchange.setResponseTrigger(responseTrigger);
}
}
- private boolean canResponseHaveBody(
- final HttpRequest request, final HttpResponse response) {
+ }
- if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
- return false;
- }
+ static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> {
- int status = response.getStatusLine().getStatusCode();
- return status >= HttpStatus.SC_OK
- && status != HttpStatus.SC_NO_CONTENT
- && status != HttpStatus.SC_NOT_MODIFIED
- && status != HttpStatus.SC_RESET_CONTENT;
+ private final ProxyHttpExchange httpExchange;
+ private final HttpAsyncRequestExecutor executor;
+ private final BasicNIOConnPool connPool;
+
+ private volatile boolean completed;
+
+ public ProxyRequestConsumer(
+ final ProxyHttpExchange httpExchange,
+ final HttpAsyncRequestExecutor executor,
+ final BasicNIOConnPool connPool) {
+ super();
+ this.httpExchange = httpExchange;
+ this.executor = executor;
+ this.connPool = connPool;
}
- public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
- System.out.println(conn + " [client<-proxy] output ready");
+ public void close() throws IOException {
+ }
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ public void requestReceived(final HttpRequest request) {
+ synchronized (this.httpExchange) {
+ System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine());
+ this.httpExchange.setRequest(request);
+ this.executor.execute(
+ new ProxyRequestProducer(this.httpExchange),
+ new ProxyResponseConsumer(this.httpExchange),
+ this.connPool);
+ }
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getClientState();
- if (connState != ConnState.RESPONSE_SENT
- && connState != ConnState.RESPONSE_BODY_STREAM) {
- throw new IllegalStateException("Illegal client connection state: " + connState);
+ public void consumeContent(
+ final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+ synchronized (this.httpExchange) {
+ this.httpExchange.setClientIOControl(ioctrl);
+ // Receive data from the client
+ ByteBuffer buf = this.httpExchange.getInBuffer();
+ int n = decoder.read(buf);
+ System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read");
+ if (decoder.isCompleted()) {
+ System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read");
}
-
- HttpResponse response = proxyTask.getResponse();
- if (response == null) {
- throw new IllegalStateException("HTTP request is null");
+ // If the buffer is full, suspend client input until there is free
+ // space in the buffer
+ if (!buf.hasRemaining()) {
+ ioctrl.suspendInput();
}
-
- try {
-
- ByteBuffer src = proxyTask.getOutBuffer();
- src.flip();
- int bytesWritten = encoder.write(src);
- System.out.println(conn + " [client<-proxy] " + bytesWritten + " bytes written");
- System.out.println(conn + " [client<-proxy] " + encoder);
- src.compact();
-
- if (src.position() == 0) {
-
- if (proxyTask.getOriginState() == ConnState.RESPONSE_BODY_DONE) {
- encoder.complete();
- } else {
- // Input output is empty. Wait until the origin handler
- // fills up the buffer
- conn.suspendOutput();
- }
- }
-
- // Update connection state
- if (encoder.isCompleted()) {
- System.out.println(conn + " [proxy] response body sent");
- proxyTask.setClientState(ConnState.RESPONSE_BODY_DONE);
- if (!this.connStrategy.keepAlive(response, context)) {
- System.out.println(conn + " [client<-proxy] close connection");
- proxyTask.setClientState(ConnState.CLOSING);
- conn.close();
- } else {
- // Reset connection state
- proxyTask.reset();
- conn.requestInput();
- // Ready to deal with a new request
- }
- } else {
- proxyTask.setClientState(ConnState.RESPONSE_BODY_STREAM);
- // Make sure origin input is active
- proxyTask.getOriginIOControl().requestInput();
+ // If there is some content in the input buffer make sure origin
+ // output is active
+ if (buf.position() > 0) {
+ if (this.httpExchange.getOriginIOControl() != null) {
+ this.httpExchange.getOriginIOControl().requestOutput();
}
-
- } catch (IOException ex) {
- shutdownConnection(conn);
}
}
}
- public void closed(final NHttpServerConnection conn) {
- System.out.println(conn + " [client->proxy] conn closed");
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
- if (proxyTask != null) {
- synchronized (proxyTask) {
- proxyTask.setClientState(ConnState.CLOSED);
- }
+ public void requestCompleted(final HttpContext context) {
+ synchronized (this.httpExchange) {
+ this.completed = true;;
+ System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed");
+ this.httpExchange.setRequestReceived();
}
}
- public void exception(final NHttpServerConnection conn, final HttpException httpex) {
- System.out.println(conn + " [client->proxy] HTTP error: " + httpex.getMessage());
-
- if (conn.isResponseSubmitted()) {
- shutdownConnection(conn);
- return;
- }
-
- HttpContext context = conn.getContext();
-
- try {
- HttpResponse response = this.responseFactory.newHttpResponse(
- HttpVersion.HTTP_1_0, HttpStatus.SC_BAD_REQUEST, context);
- response.setParams(
- new DefaultedHttpParams(this.params, response.getParams()));
- response.addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
- // Pre-process HTTP request
- context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
- context.setAttribute(ExecutionContext.HTTP_REQUEST, null);
- this.httpProcessor.process(response, context);
-
- conn.submitResponse(response);
-
- conn.close();
-
- } catch (IOException ex) {
- shutdownConnection(conn);
- } catch (HttpException ex) {
- shutdownConnection(conn);
- }
- }
-
- public void exception(final NHttpServerConnection conn, final IOException ex) {
- shutdownConnection(conn);
- System.out.println(conn + " [client->proxy] I/O error: " + ex.getMessage());
- }
-
- public void timeout(final NHttpServerConnection conn) {
- System.out.println(conn + " [client->proxy] timeout");
- closeConnection(conn);
+ public Exception getException() {
+ return null;
}
- private void shutdownConnection(final NHttpConnection conn) {
- try {
- conn.shutdown();
- } catch (IOException ignore) {
- }
+ public ProxyHttpExchange getResult() {
+ return this.httpExchange;
}
- private void closeConnection(final NHttpConnection conn) {
- try {
- conn.close();
- } catch (IOException ignore) {
- }
+ public boolean isDone() {
+ return this.completed;
}
}
- static class ConnectingHandler implements NHttpClientHandler {
+ static class ProxyRequestProducer implements HttpAsyncRequestProducer {
- private final HttpProcessor httpProcessor;
- private final ConnectionReuseStrategy connStrategy;
- private final HttpParams params;
+ private final ProxyHttpExchange httpExchange;
- public ConnectingHandler(
- final HttpProcessor httpProcessor,
- final ConnectionReuseStrategy connStrategy,
- final HttpParams params) {
+ public ProxyRequestProducer(final ProxyHttpExchange httpExchange) {
super();
- this.httpProcessor = httpProcessor;
- this.connStrategy = connStrategy;
- this.params = params;
+ this.httpExchange = httpExchange;
}
- public void connected(final NHttpClientConnection conn, final Object attachment) {
- System.out.println(conn + " [proxy->origin] conn open");
+ public void close() throws IOException {
+ }
- // The shared state object is expected to be passed as an attachment
- ProxyTask proxyTask = (ProxyTask) attachment;
+ public HttpHost getTarget() {
+ synchronized (this.httpExchange) {
+ return this.httpExchange.getTarget();
+ }
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getOriginState();
- if (connState != ConnState.IDLE) {
- throw new IllegalStateException("Illegal target connection state: " + connState);
+ public HttpRequest generateRequest() {
+ synchronized (this.httpExchange) {
+ HttpRequest request = this.httpExchange.getRequest();
+ System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine());
+ // Rewrite request!!!!
+ if (request instanceof HttpEntityEnclosingRequest) {
+ BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest(
+ request.getRequestLine());
+ r.setEntity(((HttpEntityEnclosingRequest) request).getEntity());
+ return r;
+ } else {
+ return new BasicHttpRequest(request.getRequestLine());
}
+ }
+ }
- // Set origin IO control handle
- proxyTask.setOriginIOControl(conn);
- // Store the state object in the context
- HttpContext context = conn.getContext();
- context.setAttribute(ProxyTask.ATTRIB, proxyTask);
- // Update connection state
- proxyTask.setOriginState(ConnState.CONNECTED);
-
- if (proxyTask.getRequest() != null) {
- conn.requestOutput();
+ public void produceContent(
+ final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+ synchronized (this.httpExchange) {
+ this.httpExchange.setOriginIOControl(ioctrl);
+ // Send data to the origin server
+ ByteBuffer buf = this.httpExchange.getInBuffer();
+ buf.flip();
+ int n = encoder.write(buf);
+ buf.compact();
+ System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written");
+ if (encoder.isCompleted()) {
+ System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written");
+ }
+ // If there is space in the buffer and the message has not been
+ // transferred, make sure the client is sending more data
+ if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) {
+ if (this.httpExchange.getClientIOControl() != null) {
+ this.httpExchange.getClientIOControl().requestInput();
+ }
+ }
+ if (buf.position() == 0) {
+ if (this.httpExchange.isRequestReceived()) {
+ encoder.complete();
+ } else {
+ // Input buffer is empty. Wait until the client fills up
+ // the buffer
+ ioctrl.suspendOutput();
+ }
}
}
}
- public void requestReady(final NHttpClientConnection conn) {
- System.out.println(conn + " [proxy->origin] request ready");
-
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ public boolean isRepeatable() {
+ return false;
+ }
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getOriginState();
- if (connState == ConnState.REQUEST_SENT
- || connState == ConnState.REQUEST_BODY_DONE) {
- // Request sent but no response available yet
- return;
- }
+ public void resetRequest() {
+ }
- if (connState != ConnState.IDLE
- && connState != ConnState.CONNECTED) {
- throw new IllegalStateException("Illegal target connection state: " + connState);
- }
-
- HttpRequest request = proxyTask.getRequest();
- if (request == null) {
- throw new IllegalStateException("HTTP request is null");
- }
-
- // Remove hop-by-hop headers
- request.removeHeaders(HTTP.CONTENT_LEN);
- request.removeHeaders(HTTP.TRANSFER_ENCODING);
- request.removeHeaders(HTTP.CONN_DIRECTIVE);
- request.removeHeaders("Keep-Alive");
- request.removeHeaders("Proxy-Authenticate");
- request.removeHeaders("Proxy-Authorization");
- request.removeHeaders("TE");
- request.removeHeaders("Trailers");
- request.removeHeaders("Upgrade");
- // Remove host header
- request.removeHeaders(HTTP.TARGET_HOST);
+ }
- HttpHost targetHost = proxyTask.getTarget();
+ static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> {
- try {
+ private final ProxyHttpExchange httpExchange;
- request.setParams(
- new DefaultedHttpParams(request.getParams(), this.params));
+ private volatile boolean completed;
- // Pre-process HTTP request
- context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
- context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, targetHost);
-
- this.httpProcessor.process(request, context);
- // and send it to the origin server
- conn.submitRequest(request);
- // Update connection state
- proxyTask.setOriginState(ConnState.REQUEST_SENT);
+ public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) {
+ super();
+ this.httpExchange = httpExchange;
+ }
- System.out.println(conn + " [proxy->origin] >> " + request.getRequestLine().toString());
+ public void close() throws IOException {
+ }
- } catch (IOException ex) {
- shutdownConnection(conn);
- } catch (HttpException ex) {
- shutdownConnection(conn);
+ public void responseReceived(final HttpResponse response) {
+ synchronized (this.httpExchange) {
+ System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine());
+ this.httpExchange.setResponse(response);
+ HttpAsyncResponseTrigger responseTrigger = this.httpExchange.getResponseTrigger();
+ if (responseTrigger != null && !responseTrigger.isTriggered()) {
+ System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered");
+ responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange));
}
-
}
}
- public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
- System.out.println(conn + " [proxy->origin] output ready");
-
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getOriginState();
- if (connState != ConnState.REQUEST_SENT
- && connState != ConnState.REQUEST_BODY_STREAM) {
- throw new IllegalStateException("Illegal target connection state: " + connState);
+ public void consumeContent(
+ final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+ synchronized (this.httpExchange) {
+ this.httpExchange.setOriginIOControl(ioctrl);
+ // Receive data from the origin
+ ByteBuffer buf = this.httpExchange.getOutBuffer();
+ int n = decoder.read(buf);
+ System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read");
+ if (decoder.isCompleted()) {
+ System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read");
}
-
- try {
-
- ByteBuffer src = proxyTask.getInBuffer();
- src.flip();
- int bytesWritten = encoder.write(src);
- System.out.println(conn + " [proxy->origin] " + bytesWritten + " bytes written");
- System.out.println(conn + " [proxy->origin] " + encoder);
- src.compact();
-
- if (src.position() == 0) {
- if (proxyTask.getClientState() == ConnState.REQUEST_BODY_DONE) {
- encoder.complete();
- } else {
- // Input buffer is empty. Wait until the client fills up
- // the buffer
- conn.suspendOutput();
- }
- }
- // Update connection state
- if (encoder.isCompleted()) {
- System.out.println(conn + " [proxy->origin] request body sent");
- proxyTask.setOriginState(ConnState.REQUEST_BODY_DONE);
- } else {
- proxyTask.setOriginState(ConnState.REQUEST_BODY_STREAM);
- // Make sure client input is active
- proxyTask.getClientIOControl().requestInput();
+ // If the buffer is full, suspend origin input until there is free
+ // space in the buffer
+ if (!buf.hasRemaining()) {
+ ioctrl.suspendInput();
+ }
+ // If there is some content in the input buffer make sure client
+ // output is active
+ if (buf.position() > 0) {
+ if (this.httpExchange.getClientIOControl() != null) {
+ this.httpExchange.getClientIOControl().requestOutput();
}
-
- } catch (IOException ex) {
- shutdownConnection(conn);
}
}
}
- public void responseReceived(final NHttpClientConnection conn) {
- System.out.println(conn + " [proxy<-origin] response received");
-
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getOriginState();
- if (connState != ConnState.REQUEST_SENT
- && connState != ConnState.REQUEST_BODY_DONE) {
- throw new IllegalStateException("Illegal target connection state: " + connState);
+ public void responseCompleted(final HttpContext context) {
+ synchronized (this.httpExchange) {
+ if (this.completed) {
+ return;
}
+ this.completed = true;
+ System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed");
+ this.httpExchange.setResponseReceived();
+ }
+ }
- HttpResponse response = conn.getHttpResponse();
- HttpRequest request = proxyTask.getRequest();
-
- System.out.println(conn + " [proxy<-origin] << " + response.getStatusLine());
-
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode < HttpStatus.SC_OK) {
- // Ignore 1xx response
+ public void failed(Exception ex) {
+ synchronized (this.httpExchange) {
+ if (this.completed) {
return;
}
- try {
-
- // Update connection state
- proxyTask.setResponse(response);
- proxyTask.setOriginState(ConnState.RESPONSE_RECEIVED);
-
- if (!canResponseHaveBody(request, response)) {
- conn.resetInput();
- if (!this.connStrategy.keepAlive(response, context)) {
- System.out.println(conn + " [proxy<-origin] close connection");
- proxyTask.setOriginState(ConnState.CLOSING);
- conn.close();
- }
+ this.completed = true;
+ this.httpExchange.setException(ex);
+ HttpAsyncResponseTrigger responseTrigger = this.httpExchange.getResponseTrigger();
+ if (responseTrigger != null && !responseTrigger.isTriggered()) {
+ System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex);
+ int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
+ HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
+ EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
+ String message = ex.getMessage();
+ if (message == null) {
+ message = "Unexpected error";
}
- // Make sure client output is active
- proxyTask.getClientIOControl().requestOutput();
-
- } catch (IOException ex) {
- shutdownConnection(conn);
+ response.setEntity(NStringEntity.create(message, ContentType.DEFAULT_TEXT));
+ responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
}
}
-
}
- private boolean canResponseHaveBody(
- final HttpRequest request, final HttpResponse response) {
-
- if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
- return false;
- }
-
- int status = response.getStatusLine().getStatusCode();
- return status >= HttpStatus.SC_OK
- && status != HttpStatus.SC_NO_CONTENT
- && status != HttpStatus.SC_NOT_MODIFIED
- && status != HttpStatus.SC_RESET_CONTENT;
+ public void cancel() {
+ failed(new InterruptedIOException("Cancelled"));
}
- public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
- System.out.println(conn + " [proxy<-origin] input ready");
-
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
-
- synchronized (proxyTask) {
- ConnState connState = proxyTask.getOriginState();
- if (connState != ConnState.RESPONSE_RECEIVED
- && connState != ConnState.RESPONSE_BODY_STREAM) {
- throw new IllegalStateException("Illegal target connection state: " + connState);
- }
-
- HttpResponse response = proxyTask.getResponse();
- try {
-
- ByteBuffer dst = proxyTask.getOutBuffer();
- int bytesRead = decoder.read(dst);
- System.out.println(conn + " [proxy<-origin] " + bytesRead + " bytes read");
- System.out.println(conn + " [proxy<-origin] " + decoder);
- if (!dst.hasRemaining()) {
- // Output buffer is full. Suspend origin input until
- // the client handler frees up some space in the buffer
- conn.suspendInput();
- }
- // If there is some content in the buffer make sure client output
- // is active
- if (dst.position() > 0) {
- proxyTask.getClientIOControl().requestOutput();
- }
+ public ProxyHttpExchange getResult() {
+ return this.httpExchange;
+ }
- if (decoder.isCompleted()) {
- System.out.println(conn + " [proxy<-origin] response body received");
- proxyTask.setOriginState(ConnState.RESPONSE_BODY_DONE);
-
- if (!this.connStrategy.keepAlive(response, context)) {
- System.out.println(conn + " [proxy<-origin] close connection");
- proxyTask.setOriginState(ConnState.CLOSING);
- conn.close();
- }
- } else {
- proxyTask.setOriginState(ConnState.RESPONSE_BODY_STREAM);
- }
+ public Exception getException() {
+ return null;
+ }
- } catch (IOException ex) {
- shutdownConnection(conn);
- }
- }
+ public boolean isDone() {
+ return this.completed;
}
- public void closed(final NHttpClientConnection conn) {
- System.out.println(conn + " [proxy->origin] conn closed");
- HttpContext context = conn.getContext();
- ProxyTask proxyTask = (ProxyTask) context.getAttribute(ProxyTask.ATTRIB);
+ }
- if (proxyTask != null) {
- synchronized (proxyTask) {
- proxyTask.setOriginState(ConnState.CLOSED);
- }
- }
- }
+ static class ProxyResponseProducer implements HttpAsyncResponseProducer {
- public void exception(final NHttpClientConnection conn, final HttpException ex) {
- shutdownConnection(conn);
- System.out.println(conn + " [proxy->origin] HTTP error: " + ex.getMessage());
- }
+ private final ProxyHttpExchange httpExchange;
- public void exception(final NHttpClientConnection conn, final IOException ex) {
- shutdownConnection(conn);
- System.out.println(conn + " [proxy->origin] I/O error: " + ex.getMessage());
+ public ProxyResponseProducer(final ProxyHttpExchange httpExchange) {
+ super();
+ this.httpExchange = httpExchange;
}
- public void timeout(final NHttpClientConnection conn) {
- System.out.println(conn + " [proxy->origin] timeout");
- closeConnection(conn);
+ public void close() throws IOException {
+ this.httpExchange.reset();
}
- private void shutdownConnection(final HttpConnection conn) {
- try {
- conn.shutdown();
- } catch (IOException ignore) {
+ public HttpResponse generateResponse() {
+ synchronized (this.httpExchange) {
+ HttpResponse response = this.httpExchange.getResponse();
+ System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine());
+ // Rewrite response!!!!
+ BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine());
+ r.setEntity(response.getEntity());
+ return r;
}
}
- private void closeConnection(final HttpConnection conn) {
- try {
- conn.shutdown();
- } catch (IOException ignore) {
+ public void produceContent(
+ final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+ synchronized (this.httpExchange) {
+ this.httpExchange.setClientIOControl(ioctrl);
+ // Send data to the client
+ ByteBuffer buf = this.httpExchange.getOutBuffer();
+ buf.flip();
+ int n = encoder.write(buf);
+ buf.compact();
+ System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written");
+ if (encoder.isCompleted()) {
+ System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written");
+ }
+ // If there is space in the buffer and the message has not been
+ // transferred, make sure the origin is sending more data
+ if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) {
+ if (this.httpExchange.getOriginIOControl() != null) {
+ this.httpExchange.getOriginIOControl().requestInput();
+ }
+ }
+ if (buf.position() == 0) {
+ if (this.httpExchange.isResponseReceived()) {
+ encoder.complete();
+ } else {
+ // Input buffer is empty. Wait until the origin fills up
+ // the buffer
+ ioctrl.suspendOutput();
+ }
+ }
}
}
}
- enum ConnState {
- IDLE,
- CONNECTED,
- REQUEST_RECEIVED,
- REQUEST_SENT,
- REQUEST_BODY_STREAM,
- REQUEST_BODY_DONE,
- RESPONSE_RECEIVED,
- RESPONSE_SENT,
- RESPONSE_BODY_STREAM,
- RESPONSE_BODY_DONE,
- CLOSING,
- CLOSED
- }
+ static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
- static class ProxyTask {
-
- public static final String ATTRIB = "nhttp.proxy-task";
-
- private final ByteBuffer inBuffer;
- private final ByteBuffer outBuffer;
-
- private HttpHost target;
-
- private IOControl originIOControl;
- private IOControl clientIOControl;
-
- private ConnState originState;
- private ConnState clientState;
+ @Override
+ public boolean keepAlive(final HttpResponse response, final HttpContext context) {
+ NHttpConnection conn = (NHttpConnection) context.getAttribute(
+ ExecutionContext.HTTP_CONNECTION);
+ boolean keepAlive = super.keepAlive(response, context);
+ if (keepAlive) {
+ System.out.println("[client->proxy] connection kept alive " + conn);
+ }
+ return keepAlive;
+ }
- private HttpRequest request;
- private HttpResponse response;
+ };
- public ProxyTask() {
- super();
- this.originState = ConnState.IDLE;
- this.clientState = ConnState.IDLE;
- this.inBuffer = ByteBuffer.allocateDirect(10240);
- this.outBuffer = ByteBuffer.allocateDirect(10240);
- }
+ static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
- public ByteBuffer getInBuffer() {
- return this.inBuffer;
+ @Override
+ public boolean keepAlive(final HttpResponse response, final HttpContext context) {
+ NHttpConnection conn = (NHttpConnection) context.getAttribute(
+ ExecutionContext.HTTP_CONNECTION);
+ boolean keepAlive = super.keepAlive(response, context);
+ if (keepAlive) {
+ System.out.println("[proxy->origin] connection kept alive " + conn);
+ }
+ return keepAlive;
}
- public ByteBuffer getOutBuffer() {
- return this.outBuffer;
- }
+ };
- public HttpHost getTarget() {
- return this.target;
- }
+ static class ProxyServiceHandler extends HttpAsyncServiceHandler {
- public void setTarget(final HttpHost target) {
- this.target = target;
+ public ProxyServiceHandler(
+ final HttpAsyncRequestHandlerResolver handlerResolver,
+ final HttpProcessor httpProcessor,
+ final ConnectionReuseStrategy reuseStrategy,
+ final HttpParams params) {
+ super(handlerResolver, httpProcessor, reuseStrategy, params);
}
- public HttpRequest getRequest() {
- return this.request;
+ @Override
+ protected void onException(final Exception ex) {
+ ex.printStackTrace();
}
- public void setRequest(final HttpRequest request) {
- this.request = request;
+ @Override
+ public void connected(final NHttpServerConnection conn) {
+ System.out.println("[client->proxy] connection open " + conn);
+ super.connected(conn);
}
- public HttpResponse getResponse() {
- return this.response;
+ @Override
+ public void closed(final NHttpServerConnection conn) {
+ System.out.println("[client->proxy] connection closed " + conn);
+ super.closed(conn);
}
- public void setResponse(final HttpResponse response) {
- this.response = response;
- }
+ }
- public IOControl getClientIOControl() {
- return this.clientIOControl;
- }
+ static class ProxyClientProtocolHandler extends HttpAsyncClientProtocolHandler {
- public void setClientIOControl(final IOControl clientIOControl) {
- this.clientIOControl = clientIOControl;
+ public ProxyClientProtocolHandler() {
+ super();
}
- public IOControl getOriginIOControl() {
- return this.originIOControl;
+ @Override
+ protected void onException(final Exception ex) {
+ ex.printStackTrace();
}
- public void setOriginIOControl(final IOControl originIOControl) {
- this.originIOControl = originIOControl;
+ @Override
+ public void connected(final NHttpClientConnection conn, final Object attachment) {
+ System.out.println("[proxy->origin] connection open " + conn);
+ super.connected(conn, attachment);
}
- public ConnState getOriginState() {
- return this.originState;
+ @Override
+ public void closed(final NHttpClientConnection conn) {
+ System.out.println("[proxy->origin] connection closed " + conn);
+ super.closed(conn);
}
- public void setOriginState(final ConnState state) {
- this.originState = state;
- }
+ }
- public ConnState getClientState() {
- return this.clientState;
- }
+ static class ProxyConnPool extends BasicNIOConnPool {
- public void setClientState(final ConnState state) {
- this.clientState = state;
+ public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) {
+ super(ioreactor, params);
}
- public void reset() {
- this.inBuffer.clear();
- this.outBuffer.clear();
- this.originState = ConnState.IDLE;
- this.clientState = ConnState.IDLE;
- this.request = null;
- this.response = null;
+ public ProxyConnPool(
+ final ConnectingIOReactor ioreactor,
+ final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
+ final HttpParams params) {
+ super(ioreactor, connFactory, params);
}
- public void shutdown() {
- if (this.clientIOControl != null) {
- try {
- this.clientIOControl.shutdown();
- } catch (IOException ignore) {
- }
- }
- if (this.originIOControl != null) {
- try {
- this.originIOControl.shutdown();
- } catch (IOException ignore) {
- }
- }
+ @Override
+ public void release(final BasicNIOPoolEntry entry, boolean reusable) {
+ System.out.println("[proxy->origin] connection released " + entry.getConnection());
+ super.release(entry, reusable);
+ StringBuilder buf = new StringBuilder();
+ PoolStats totals = getTotalStats();
+ buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
+ buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
+ buf.append(" of ").append(totals.getMax()).append("]");
+ System.out.println("[proxy->origin] " + buf.toString());
}
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseTrigger.java Wed Sep 7 18:12:13 2011
@@ -34,4 +34,6 @@ public interface HttpAsyncResponseTrigge
void submitResponse(HttpAsyncResponseProducer responseProducer);
+ boolean isTriggered();
+
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1166291&r1=1166290&r2=1166291&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Wed Sep 7 18:12:13 2011
@@ -241,12 +241,12 @@ public class HttpAsyncServiceHandler imp
producer.produceContent(encoder, conn);
if (encoder.isCompleted()) {
- httpExchange.reset();
if (!this.connStrategy.keepAlive(response, context)) {
conn.close();
} else {
conn.requestInput();
}
+ httpExchange.reset();
}
} catch (RuntimeException ex) {
shutdownConnection(conn);
@@ -280,7 +280,7 @@ public class HttpAsyncServiceHandler imp
}
}
- private ErrorResponseProducer handleException(final Exception ex) {
+ protected HttpAsyncResponseProducer handleException(final Exception ex) {
int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
if (ex instanceof MethodNotSupportedException) {
code = HttpStatus.SC_NOT_IMPLEMENTED;
@@ -317,7 +317,7 @@ public class HttpAsyncServiceHandler imp
consumer.requestCompleted(context);
Exception ex = consumer.getException();
if (ex != null) {
- ErrorResponseProducer responseProducer = handleException(ex);
+ HttpAsyncResponseProducer responseProducer = handleException(ex);
httpExchange.setResponseProducer(responseProducer);
conn.requestOutput();
} else {
@@ -352,13 +352,13 @@ public class HttpAsyncServiceHandler imp
conn.submitResponse(response);
if (entity == null) {
- httpExchange.reset();
if (!this.connStrategy.keepAlive(response, context)) {
conn.close();
} else {
// Ready to process new request
conn.requestInput();
}
+ httpExchange.reset();
}
}
@@ -524,6 +524,10 @@ public class HttpAsyncServiceHandler imp
this.iocontrol.requestOutput();
}
+ public boolean isTriggered() {
+ return this.triggered;
+ }
+
}
}