You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/05/26 10:01:06 UTC
svn commit: r660119 - in
/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp:
./ util/
Author: asankha
Date: Mon May 26 01:01:04 2008
New Revision: 660119
URL: http://svn.apache.org/viewvc?rev=660119&view=rev
Log:
fixed SYNAPSE-321
implemented suggestion from Oleg K. to do away with using Pipes for channel to stream bridging
Added:
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java
Removed:
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/PipeImpl.java
Modified:
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Mon May 26 01:01:04 2008
@@ -31,20 +31,18 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpVersion;
+import org.apache.http.nio.util.ContentOutputBuffer;
+import org.apache.http.nio.entity.ContentOutputStream;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.protocol.HTTP;
import org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
import org.apache.synapse.transport.nhttp.util.RESTUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.Map;
@@ -64,13 +62,15 @@
private HttpHost httpHost = null;
/** the message context being sent */
private MessageContext msgContext = null;
- /** the Pipe which facilitates the serialization output to be written to the channel */
- private PipeImpl pipe = null;
/** The Axis2 MessageFormatter that will ensure proper serialization as per Axis2 semantics */
MessageFormatter messageFormatter = null;
/** The OM Output format holder */
OMOutputFormat format = null;
- protected boolean completed = false; //added for request complete checking
+ private ContentOutputBuffer outputBuffer = null;
+ /** ready to begin streaming? */
+ private boolean readyToStream = false;
+ /** for request complete checking */
+ private boolean completed = false;
public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost, MessageContext msgContext) {
this.epr = epr;
@@ -79,11 +79,22 @@
this.format = NhttpUtils.getOMOutputFormat(msgContext);
this.messageFormatter =
MessageFormatterDecoratorFactory.createMessageFormatterDecorator(msgContext);
- try {
- this.pipe = new PipeImpl();
- } catch (IOException e) {
- log.error("Error creating pipe to write message body", e);
- }
+ }
+
+ public void setReadyToStream(boolean readyToStream) {
+ this.readyToStream = readyToStream;
+ }
+
+ public void setOutputBuffer(ContentOutputBuffer outputBuffer) {
+ this.outputBuffer = outputBuffer;
+ }
+
+ public void clear() {
+ this.epr = null;
+ this.httpHost = null;
+ this.msgContext = null;
+ this.format = null;
+ this.messageFormatter = null;
}
public EndpointReference getEpr() {
@@ -185,28 +196,6 @@
}
/**
- * Return the source channel of the pipe that bridges the serialized output to the socket
- * @return source channel to read serialized message contents
- */
- public ReadableByteChannel getSourceChannel() {
- if (log.isDebugEnabled()) {
- log.debug("get source channel of the pipe on which the outgoing response is written");
- }
- return pipe.source();
- }
-
- /**
- * Return the sink channel of the pipe that bridges the serialized output to the socket
- * @return sink channel to read serialized message contents
- */
- public WritableByteChannel getSinkChannel() {
- if (log.isDebugEnabled()) {
- log.debug("get sink channel of the pipe on which the outgoing response is written");
- }
- return pipe.sink();
- }
-
- /**
* Start streaming the message into the Pipe, so that the contents could be read off the source
* channel returned by getSourceChannel()
* @throws AxisFault on error
@@ -216,7 +205,16 @@
if (log.isDebugEnabled()) {
log.debug("start streaming outgoing http request");
}
- OutputStream out = Channels.newOutputStream(pipe.sink());
+
+ synchronized(this) {
+ while (!readyToStream) {
+ try {
+ this.wait();
+ } catch (InterruptedException ignore) {}
+ }
+ }
+
+ OutputStream out = new ContentOutputStream(outputBuffer);
try {
messageFormatter.writeTo(msgContext, format, out, true);
} catch (Exception e) {
@@ -257,9 +255,6 @@
}
public void setCompleted(boolean completed) {
- if (completed && !isCompleted()) {
- this.pipe.close();
- }
this.completed = completed;
}
}
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Mon May 26 01:01:04 2008
@@ -39,20 +39,17 @@
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.util.*;
import org.apache.http.params.DefaultedHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.*;
import org.apache.synapse.transport.base.MetricsCollector;
import org.apache.synapse.transport.base.threads.WorkerPool;
import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
/**
* The client connection handler. An instance of this class is used by each IOReactor, to
@@ -69,6 +66,8 @@
private final HttpProcessor httpProcessor;
/** the connection re-use strategy */
private final ConnectionReuseStrategy connStrategy;
+ /** the buffer allocator */
+ private final ByteBufferAllocator allocator;
/** the Axis2 configuration context */
ConfigurationContext cfgCtx = null;
@@ -79,15 +78,12 @@
/** the metrics collector */
private MetricsCollector metrics = null;
- private static final String REQUEST_BUFFER = "request-buffer";
- private static final String RESPONSE_BUFFER = "response-buffer";
- private static final String OUTGOING_MESSAGE_CONTEXT = "axis2_message_context";
- private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
- private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
- private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
- private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
+ public static final String OUTGOING_MESSAGE_CONTEXT = "synapse.axis2_message_context";
+ public static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
+
+ public static final String REQUEST_SOURCE_BUFFER = "synapse.request-source-buffer";
+ public static final String RESPONSE_SINK_BUFFER = "synapse.response-sink-buffer";
- private static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
private static final String CONTENT_TYPE = "Content-Type";
/**
@@ -105,6 +101,7 @@
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
this.metrics = metrics;
+ this.allocator = new HeapByteBufferAllocator();
this.cfg = NHttpConfiguration.getInstance();
workerPool = WorkerPoolFactory.getWorkerPool(
@@ -129,13 +126,14 @@
try {
HttpContext context = conn.getContext();
+ ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+ axis2Req.setOutputBuffer(outputBuffer);
+ context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
+ context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
- context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
- context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
HttpRequest request = axis2Req.getRequest();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -144,6 +142,11 @@
conn.submitRequest(request);
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ synchronized(axis2Req) {
+ axis2Req.setReadyToStream(true);
+ axis2Req.notifyAll();
+ }
+
} catch (IOException e) {
handleException("I/O Error : " + e.getMessage(), e, conn);
} catch (HttpException e) {
@@ -165,18 +168,14 @@
try {
HttpContext context = conn.getContext();
Axis2HttpRequest axis2Req = (Axis2HttpRequest) attachment;
+ ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+ axis2Req.setOutputBuffer(outputBuffer);
+ context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
- // allocate temporary buffers to process this request
- context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
- context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
- context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
- context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
HttpRequest request = axis2Req.getRequest();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -185,6 +184,11 @@
conn.submitRequest(request);
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ synchronized(axis2Req) {
+ axis2Req.setReadyToStream(true);
+ axis2Req.notifyAll();
+ }
+
} catch (IOException e) {
handleException("I/O Error : " + e.getMessage(), e, conn);
} catch (HttpException e) {
@@ -193,33 +197,18 @@
}
public void closed(final NHttpClientConnection conn) {
- checkAxisRequestComplete(conn, "Abnormal connection close", null);
-
- // Check sink and source channels and close them if they aren't closed already.
- // Normally these should be closed by inputReady() and outputReady(). A null request
- // or response will not hit inputReady and outputReady however.
+ ConnectionPool.forget(conn);
+ checkAxisRequestComplete(conn, "Abnormal connection close", null);
HttpContext context = conn.getContext();
- closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
- // Note: We do not close the RESPONSE_SOURCE_CHANNEL at this point in time as its closed
- // by the ClientWorker:run() in the finally block, after the response was processed
- // fix for https://issues.apache.org/jira/browse/SYNAPSE-289
- closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
- closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
-
+ context.removeAttribute(RESPONSE_SINK_BUFFER);
+ context.removeAttribute(REQUEST_SOURCE_BUFFER);
+
if (log.isTraceEnabled()) {
log.trace("Connection closed");
}
}
- private void closeChannel(Channel chn) {
- try {
- if (chn != null && chn.isOpen()) {
- chn.close();
- }
- } catch (IOException ignore) {}
- }
-
/**
* Handle connection timeouts by shutting down the connections
* @param conn the connection being processed
@@ -325,25 +314,18 @@
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
HttpContext context = conn.getContext();
HttpResponse response = conn.getHttpResponse();
- WritableByteChannel sink
- = (WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
- ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+ ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(RESPONSE_SINK_BUFFER);
try {
- while (decoder.read(inbuf) > 0) {
- inbuf.flip();
- sink.write(inbuf);
- if (metrics != null) {
- metrics.incrementBytesReceived(inbuf.position());
- }
- inbuf.compact();
+ int bytesRead = inBuf.consumeContent(decoder);
+ if (metrics != null && bytesRead > 0) {
+ metrics.incrementBytesReceived(bytesRead);
}
if (decoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesReceived();
}
- if (sink != null) sink.close();
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
@@ -364,28 +346,18 @@
public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
HttpContext context = conn.getContext();
- ReadableByteChannel source
- = (ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
- ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+ ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(REQUEST_SOURCE_BUFFER);
try {
- int bytesRead = source.read(outbuf);
- if (bytesRead == -1) {
- encoder.complete();
- } else {
- outbuf.flip();
- encoder.write(outbuf);
- if (metrics != null) {
- metrics.incrementBytesSent(outbuf.position());
- }
- outbuf.compact();
+ int bytesWritten = outBuf.produceContent(encoder);
+ if (metrics != null && bytesWritten > 0) {
+ metrics.incrementBytesSent(bytesWritten);
}
if (encoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesSent();
}
- source.close();
}
} catch (IOException e) {
@@ -556,25 +528,20 @@
private void processResponse(final NHttpClientConnection conn, HttpContext context,
HttpResponse response) {
- try {
- PipeImpl responsePipe = new PipeImpl();
- context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
- context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
-
- BasicHttpEntity entity = new BasicHttpEntity();
- if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
- entity.setChunked(true);
- }
- response.setEntity(entity);
- context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
-
- workerPool.execute(
- new ClientWorker(cfgCtx, Channels.newInputStream(responsePipe.source()), response,
- (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+ ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+ context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
+
+ BasicHttpEntity entity = new BasicHttpEntity();
+ if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
+ entity.setChunked(true);
+ }
+ response.setEntity(entity);
+ context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
+
+ workerPool.execute(
+ new ClientWorker(cfgCtx, new ContentInputStream(inputBuffer), response,
+ (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
- } catch (IOException e) {
- handleException("I/O Error : " + e.getMessage(), e, conn);
- }
}
// ----------- utility methods -----------
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java Mon May 26 01:01:04 2008
@@ -20,6 +20,7 @@
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
import org.apache.http.HttpHost;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,6 +60,7 @@
log.debug("A connection to host : " + host + " on port : " +
port + " is available in the pool, and will be reused");
}
+ conn.requestInput(); // asankha - make sure keep alives work properly when reused with throttling
return conn;
} else {
if (log.isDebugEnabled()) {
@@ -93,6 +95,7 @@
}
}
+ cleanConnectionReferences(conn);
connections.add(conn);
if (log.isDebugEnabled()) {
@@ -100,4 +103,37 @@
host.getPort() + " to the connection pool of current size : " + connections.size());
}
}
+
+ private static void cleanConnectionReferences(NHttpClientConnection conn) {
+
+ HttpContext ctx = conn.getContext();
+ Axis2HttpRequest axis2Req =
+ (Axis2HttpRequest) ctx.getAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+ axis2Req.clear(); // this is linked via the selection key attachment and will free itself
+ // on timeout of the keep alive connection. Till then minimize the
+ // memory usage to a few bytes
+
+ ctx.removeAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+ ctx.removeAttribute(ClientHandler.OUTGOING_MESSAGE_CONTEXT);
+ ctx.removeAttribute(ClientHandler.REQUEST_SOURCE_BUFFER);
+ ctx.removeAttribute(ClientHandler.RESPONSE_SINK_BUFFER);
+
+ ctx.removeAttribute(ExecutionContext.HTTP_REQUEST);
+ ctx.removeAttribute(ExecutionContext.HTTP_RESPONSE);
+ ctx.removeAttribute(ExecutionContext.HTTP_CONNECTION);
+ }
+
+ public static void forget(NHttpClientConnection conn) {
+
+ HttpHost host = (HttpHost) conn.getContext().getAttribute(
+ ExecutionContext.HTTP_TARGET_HOST);
+ String key = host.getHostName() + ":" + Integer.toString(host.getPort());
+
+ List connections = (List) connMap.get(key);
+ if (connections != null) {
+ synchronized(connections) {
+ connections.remove(conn);
+ }
+ }
+ }
}
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java Mon May 26 01:01:04 2008
@@ -38,7 +38,7 @@
private static final int WORKER_KEEP_ALIVE = 5;
private static final int BLOCKING_QUEUE_LENGTH = -1;
private static final int IO_WORKER_COUNT = 2;
- private static final int BUFFER_SIZE = 2048;
+ private static final int BUFFER_SIZE = 8192;
// server listener
private static final String S_T_CORE = "snd_t_core";
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Mon May 26 01:01:04 2008
@@ -19,10 +19,10 @@
package org.apache.synapse.transport.nhttp;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
import org.apache.synapse.transport.base.MetricsCollector;
import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
import org.apache.synapse.transport.base.threads.WorkerPool;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
@@ -32,6 +32,10 @@
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.NHttpServiceHandler;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.entity.ContentOutputStream;
+import org.apache.http.nio.util.*;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.*;
import org.apache.http.util.EncodingUtils;
@@ -39,11 +43,6 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.Channel;
/**
* The server connection handler. An instance of this class is used by each IOReactor, to
@@ -62,6 +61,8 @@
private final HttpProcessor httpProcessor;
/** the strategy to re-use connections */
private final ConnectionReuseStrategy connStrategy;
+ /** the buffer allocator */
+ private final ByteBufferAllocator allocator;
/** the Axis2 configuration context */
ConfigurationContext cfgCtx = null;
@@ -75,12 +76,8 @@
/** the metrics collector */
private MetricsCollector metrics = null;
- private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
- private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
- private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
- private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
- private static final String REQUEST_BUFFER = "request-buffer";
- private static final String RESPONSE_BUFFER = "response-buffer";
+ public static final String REQUEST_SINK_BUFFER = "synapse.request-sink-buffer";
+ public static final String RESPONSE_SOURCE_BUFFER = "synapse.response-source-buffer";
public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams params,
final boolean isHttps, final MetricsCollector metrics) {
@@ -92,6 +89,7 @@
this.responseFactory = new DefaultHttpResponseFactory();
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
+ this.allocator = new HeapByteBufferAllocator();
this.cfg = NHttpConfiguration.getInstance();
this.workerPool = WorkerPoolFactory.getWorkerPool(
@@ -112,17 +110,11 @@
HttpRequest request = conn.getHttpRequest();
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
- // allocate temporary buffers to process this request
- context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
- context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
try {
- PipeImpl requestPipe = new PipeImpl(); // the pipe used to process the request
- PipeImpl responsePipe = new PipeImpl(); // the pipe used to process the response
- context.setAttribute(REQUEST_SINK_CHANNEL, requestPipe.sink());
- context.setAttribute(REQUEST_SOURCE_CHANNEL, requestPipe.source());
- context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
- context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
+ ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+ ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+ context.setAttribute(REQUEST_SINK_BUFFER, inputBuffer);
+ context.setAttribute(RESPONSE_SOURCE_BUFFER, outputBuffer);
// create the default response to this request
ProtocolVersion httpVersion = request.getRequestLine().getProtocolVersion();
@@ -132,7 +124,6 @@
// create a basic HttpEntity using the source channel of the response pipe
BasicHttpEntity entity = new BasicHttpEntity();
- entity.setContent(Channels.newInputStream(responsePipe.source()));
if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
entity.setChunked(true);
}
@@ -141,12 +132,9 @@
// hand off processing of the request to a thread off the pool
workerPool.execute(
new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
- request, Channels.newInputStream(requestPipe.source()),
- response, Channels.newOutputStream(responsePipe.sink())));
+ request, new ContentInputStream(inputBuffer),
+ response, new ContentOutputStream(outputBuffer)));
- } catch (IOException e) {
- handleException("Error processing request received for : " +
- request.getRequestLine().getUri(), e, conn);
} catch (Exception e) {
handleException("Error processing request received for : " +
request.getRequestLine().getUri(), e, conn);
@@ -161,24 +149,18 @@
public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
HttpContext context = conn.getContext();
- WritableByteChannel sink = (WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL);
- ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+ ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(REQUEST_SINK_BUFFER);
try {
- while (decoder.read(inbuf) > 0) {
- inbuf.flip();
- sink.write(inbuf);
- if (metrics != null) {
- metrics.incrementBytesReceived(inbuf.position());
- }
- inbuf.compact();
+ int bytesRead = inBuf.consumeContent(decoder);
+ if (metrics != null && bytesRead > 0) {
+ metrics.incrementBytesReceived(bytesRead);
}
if (decoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesReceived();
}
- sink.close();
}
} catch (IOException e) {
@@ -195,29 +177,22 @@
HttpContext context = conn.getContext();
HttpResponse response = conn.getHttpResponse();
- ReadableByteChannel source = (ReadableByteChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL);
- ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+ ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(RESPONSE_SOURCE_BUFFER);
try {
- int bytesRead = source.read(outbuf);
- if (bytesRead == -1 && outbuf.position() == 0) {
- encoder.complete();
- } else {
- outbuf.flip();
- encoder.write(outbuf);
- if (metrics != null) {
- metrics.incrementBytesSent(outbuf.position());
- }
- outbuf.compact();
+ int bytesWritten = outBuf.produceContent(encoder);
+ if (metrics != null && bytesWritten > 0) {
+ metrics.incrementBytesSent(bytesWritten);
}
if (encoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesSent();
}
- source.close();
if (!connStrategy.keepAlive(response, context)) {
conn.close();
+ } else {
+ conn.requestInput();
}
}
@@ -279,29 +254,15 @@
public void closed(final NHttpServerConnection conn) {
- // Check sink and source channels and close them if they aren't closed already.
- // Normally these should be closed by inputReady() and outputReady(). A null request
- // or response will not hit inputReady and outputReady however.
-
HttpContext context = conn.getContext();
- closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
- closeChannel((ReadableByteChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL));
- closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
- closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
+ context.removeAttribute(REQUEST_SINK_BUFFER);
+ context.removeAttribute(RESPONSE_SOURCE_BUFFER);
if (log.isTraceEnabled()) {
log.trace("Connection closed");
}
}
- private void closeChannel(Channel chn) {
- try {
- if (chn != null && chn.isOpen()) {
- chn.close();
- }
- } catch (IOException ignore) {}
- }
-
/**
* Handle HTTP Protocol violations with an error response
* @param conn the connection being processed
Added: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java?rev=660119&view=auto
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java (added)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java Mon May 26 01:01:04 2008
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.nhttp.util;
+
+import org.apache.http.nio.util.ExpandableBuffer;
+import org.apache.http.nio.util.ContentInputBuffer;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.ContentDecoder;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+/**
+ * A copy of the SharedInputBuffer implementation of Apache HttpComponents - HttpCore/NIO
+ * found at http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/
+ * src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
+ *
+ * To include the fix described here : http://svn.apache.org/viewvc/httpcomponents/httpcore/
+ * trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
+ * ?view=diff&r1=659956&r2=659957&pathrev=659957
+ * with the HttpCore version 4.0-beta1
+ *
+ * TODO : This class to be removed as soon as we update the HttpCore dependency from 4.0-beta1
+ */
+public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
+
+ private final IOControl ioctrl;
+ private final Object mutex;
+
+ private volatile boolean shutdown = false;
+ private volatile boolean endOfStream = false;
+
+ public SharedInputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
+ super(buffersize, allocator);
+ if (ioctrl == null) {
+ throw new IllegalArgumentException("I/O content control may not be null");
+ }
+ this.ioctrl = ioctrl;
+ this.mutex = new Object();
+ }
+
+ public void reset() {
+ if (this.shutdown) {
+ return;
+ }
+ synchronized (this.mutex) {
+ clear();
+ this.endOfStream = false;
+ }
+ }
+
+ public int consumeContent(final ContentDecoder decoder) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ synchronized (this.mutex) {
+ setInputMode();
+ int totalRead = 0;
+ int bytesRead;
+ while ((bytesRead = decoder.read(this.buffer)) > 0) {
+ totalRead += bytesRead;
+ }
+ if (bytesRead == -1 || decoder.isCompleted()) {
+ this.endOfStream = true;
+ }
+ if (!this.buffer.hasRemaining()) {
+ this.ioctrl.suspendInput();
+ }
+ this.mutex.notifyAll();
+
+ if (totalRead > 0) {
+ return totalRead;
+ } else {
+ if (this.endOfStream) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ }
+ }
+
+ protected void waitForData() throws IOException {
+ synchronized (this.mutex) {
+ try {
+ while (!hasData() && !this.endOfStream) {
+ if (this.shutdown) {
+ throw new InterruptedIOException("Input operation aborted");
+ }
+ this.ioctrl.requestInput();
+ this.mutex.wait();
+ }
+ } catch (InterruptedException ex) {
+ throw new IOException("Interrupted while waiting for more data");
+ }
+ }
+ }
+
+ public void shutdown() {
+ if (this.shutdown) {
+ return;
+ }
+ this.shutdown = true;
+ synchronized (this.mutex) {
+ this.mutex.notifyAll();
+ }
+ }
+
+ protected boolean isShutdown() {
+ return this.shutdown;
+ }
+
+ protected boolean isEndOfStream() {
+ return this.shutdown || (!hasData() && this.endOfStream);
+ }
+
+ public int read() throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ synchronized (this.mutex) {
+ if (!hasData()) {
+ waitForData();
+ }
+ if (isEndOfStream()) {
+ return -1;
+ }
+ return this.buffer.get() & 0xff;
+ }
+ }
+
+ public int read(final byte[] b, int off, int len) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ if (b == null) {
+ return 0;
+ }
+ synchronized (this.mutex) {
+ if (!hasData()) {
+ waitForData();
+ }
+ if (isEndOfStream()) {
+ return -1;
+ }
+ setOutputMode();
+ int chunk = len;
+ if (chunk > this.buffer.remaining()) {
+ chunk = this.buffer.remaining();
+ }
+ this.buffer.get(b, off, chunk);
+ return chunk;
+ }
+ }
+
+ public int read(final byte[] b) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ if (b == null) {
+ return 0;
+ }
+ return read(b, 0, b.length);
+ }
+
+}