You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/05/26 10:01:06 UTC

svn commit: r660119 - in /synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp: ./ util/

Author: asankha
Date: Mon May 26 01:01:04 2008
New Revision: 660119

URL: http://svn.apache.org/viewvc?rev=660119&view=rev
Log:
fixed SYNAPSE-321
implemented suggestion from Oleg K. to do away with using Pipes for channel to stream bridging


Added:
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java
Removed:
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/PipeImpl.java
Modified:
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Mon May 26 01:01:04 2008
@@ -31,20 +31,18 @@
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpVersion;
+import org.apache.http.nio.util.ContentOutputBuffer;
+import org.apache.http.nio.entity.ContentOutputStream;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
 import org.apache.http.message.BasicHttpRequest;
 import org.apache.http.protocol.HTTP;
 import org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
 import org.apache.synapse.transport.nhttp.util.RESTUtil;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.nio.channels.ClosedChannelException;
 import java.util.Iterator;
 import java.util.Map;
@@ -64,13 +62,15 @@
     private HttpHost httpHost = null;
     /** the message context being sent */
     private MessageContext msgContext = null;
-    /** the Pipe which facilitates the serialization output to be written to the channel */
-    private PipeImpl pipe = null;
     /** The Axis2 MessageFormatter that will ensure proper serialization as per Axis2 semantics */
     MessageFormatter messageFormatter = null;
     /** The OM Output format holder */
     OMOutputFormat format = null;
-    protected boolean completed = false; //added for request complete checking
+    private ContentOutputBuffer outputBuffer = null;
+    /** ready to begin streaming? */
+    private boolean readyToStream = false;
+    /** for request complete checking */
+    private boolean completed = false;
 
     public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost, MessageContext msgContext) {
         this.epr = epr;
@@ -79,11 +79,22 @@
         this.format = NhttpUtils.getOMOutputFormat(msgContext);
         this.messageFormatter =
                 MessageFormatterDecoratorFactory.createMessageFormatterDecorator(msgContext);
-        try {
-            this.pipe = new PipeImpl();
-        } catch (IOException e) {
-            log.error("Error creating pipe to write message body", e);
-        }
+    }
+
+    public void setReadyToStream(boolean readyToStream) {
+        this.readyToStream = readyToStream;
+    }
+    
+    public void setOutputBuffer(ContentOutputBuffer outputBuffer) {
+        this.outputBuffer = outputBuffer;
+    }
+
+    public void clear() {
+        this.epr = null;
+        this.httpHost = null;
+        this.msgContext = null;
+        this.format = null;
+        this.messageFormatter = null;
     }
 
     public EndpointReference getEpr() {
@@ -185,28 +196,6 @@
     }
 
     /**
-     * Return the source channel of the pipe that bridges the serialized output to the socket
-     * @return source channel to read serialized message contents
-     */
-    public ReadableByteChannel getSourceChannel() {
-        if (log.isDebugEnabled()) {
-            log.debug("get source channel of the pipe on which the outgoing response is written");
-        }
-        return pipe.source();
-    }
-
-    /**
-     * Return the sink channel of the pipe that bridges the serialized output to the socket
-     * @return sink channel to read serialized message contents
-     */
-    public WritableByteChannel getSinkChannel() {
-        if (log.isDebugEnabled()) {
-            log.debug("get sink channel of the pipe on which the outgoing response is written");
-        }
-        return pipe.sink();
-    }
-
-    /**
      * Start streaming the message into the Pipe, so that the contents could be read off the source
      * channel returned by getSourceChannel()
      * @throws AxisFault on error
@@ -216,7 +205,16 @@
         if (log.isDebugEnabled()) {
             log.debug("start streaming outgoing http request");
         }
-        OutputStream out = Channels.newOutputStream(pipe.sink());
+
+        synchronized(this) {
+            while (!readyToStream) {
+                try {
+                    this.wait();
+                } catch (InterruptedException ignore) {}
+            }
+        }
+
+        OutputStream out = new ContentOutputStream(outputBuffer);
         try {
             messageFormatter.writeTo(msgContext, format, out, true);
         } catch (Exception e) {
@@ -257,9 +255,6 @@
     }
 
     public void setCompleted(boolean completed) {
-        if (completed && !isCompleted()) {
-            this.pipe.close();
-        }
         this.completed = completed;
     }
 }

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Mon May 26 01:01:04 2008
@@ -39,20 +39,17 @@
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.util.*;
 import org.apache.http.params.DefaultedHttpParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.*;
 import org.apache.synapse.transport.base.MetricsCollector;
 import org.apache.synapse.transport.base.threads.WorkerPool;
 import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * The client connection handler. An instance of this class is used by each IOReactor, to
@@ -69,6 +66,8 @@
     private final HttpProcessor httpProcessor;
     /** the connection re-use strategy */
     private final ConnectionReuseStrategy connStrategy;
+    /** the buffer allocator */
+    private final ByteBufferAllocator allocator;
 
     /** the Axis2 configuration context */
     ConfigurationContext cfgCtx = null;
@@ -79,15 +78,12 @@
     /** the metrics collector */
     private MetricsCollector metrics = null;
 
-    private static final String REQUEST_BUFFER = "request-buffer";
-    private static final String RESPONSE_BUFFER = "response-buffer";
-    private static final String OUTGOING_MESSAGE_CONTEXT = "axis2_message_context";
-    private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
-    private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
-    private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
-    private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
+    public static final String OUTGOING_MESSAGE_CONTEXT = "synapse.axis2_message_context";
+    public static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
+
+    public static final String REQUEST_SOURCE_BUFFER = "synapse.request-source-buffer";
+    public static final String RESPONSE_SINK_BUFFER = "synapse.response-sink-buffer";
 
-    private static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
     private static final String CONTENT_TYPE = "Content-Type";
 
     /**
@@ -105,6 +101,7 @@
         this.httpProcessor = getHttpProcessor();
         this.connStrategy = new DefaultConnectionReuseStrategy();
         this.metrics = metrics;
+        this.allocator = new HeapByteBufferAllocator();
 
         this.cfg = NHttpConfiguration.getInstance();
         workerPool = WorkerPoolFactory.getWorkerPool(
@@ -129,13 +126,14 @@
 
         try {
             HttpContext context = conn.getContext();
+            ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+            axis2Req.setOutputBuffer(outputBuffer);
+            context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);            
 
+            context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
             context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
             context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
             context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
-            context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
-            context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
 
             HttpRequest request = axis2Req.getRequest();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -144,6 +142,11 @@
             conn.submitRequest(request);
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
 
+            synchronized(axis2Req) {
+                axis2Req.setReadyToStream(true);
+                axis2Req.notifyAll();
+            }
+
         } catch (IOException e) {
             handleException("I/O Error : " + e.getMessage(), e, conn);
         } catch (HttpException e) {
@@ -165,18 +168,14 @@
         try {
             HttpContext context = conn.getContext();
             Axis2HttpRequest axis2Req = (Axis2HttpRequest) attachment;
+            ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+            axis2Req.setOutputBuffer(outputBuffer);
+            context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
 
             context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
             context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
             context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
-            // allocate temporary buffers to process this request
-            context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-            context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
             context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
-            context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
-            context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
 
             HttpRequest request = axis2Req.getRequest();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -185,6 +184,11 @@
             conn.submitRequest(request);
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
 
+            synchronized(axis2Req) {
+                axis2Req.setReadyToStream(true);
+                axis2Req.notifyAll();
+            }
+
         } catch (IOException e) {
             handleException("I/O Error : " + e.getMessage(), e, conn);
         } catch (HttpException e) {
@@ -193,33 +197,18 @@
     }
 
     public void closed(final NHttpClientConnection conn) {
-    	checkAxisRequestComplete(conn, "Abnormal connection close", null);
-
-        // Check sink and source channels and close them if they aren't closed already.
-        // Normally these should be closed by inputReady() and outputReady(). A null request
-        // or response will not hit inputReady and outputReady however.
+        ConnectionPool.forget(conn);
+        checkAxisRequestComplete(conn, "Abnormal connection close", null);
 
         HttpContext context = conn.getContext();
-        closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
-        // Note: We do not close the RESPONSE_SOURCE_CHANNEL at this point in time as its closed
-        // by the ClientWorker:run() in the finally block, after the response was processed
-        // fix for https://issues.apache.org/jira/browse/SYNAPSE-289
-        closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
-        closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
-        
+        context.removeAttribute(RESPONSE_SINK_BUFFER);
+        context.removeAttribute(REQUEST_SOURCE_BUFFER);        
+
         if (log.isTraceEnabled()) {
             log.trace("Connection closed");
         }
     }
 
-    private void closeChannel(Channel chn) {
-        try {
-            if (chn != null && chn.isOpen()) {
-                chn.close();
-            }
-        } catch (IOException ignore) {}
-    }
-
     /**
      * Handle connection timeouts by shutting down the connections
      * @param conn the connection being processed
@@ -325,25 +314,18 @@
     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
-        WritableByteChannel sink
-                = (WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
-        ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+        ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(RESPONSE_SINK_BUFFER);
 
         try {
-            while (decoder.read(inbuf) > 0) {
-                inbuf.flip();
-                sink.write(inbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesReceived(inbuf.position());
-                }
-                inbuf.compact();
+            int bytesRead = inBuf.consumeContent(decoder);
+            if (metrics != null && bytesRead > 0) {
+                metrics.incrementBytesReceived(bytesRead);
             }
 
             if (decoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesReceived();
                 }
-                if (sink != null) sink.close();
                 if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
                 } else {
@@ -364,28 +346,18 @@
     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
         HttpContext context = conn.getContext();
 
-        ReadableByteChannel source
-                = (ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
-        ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+        ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(REQUEST_SOURCE_BUFFER);
 
         try {
-            int bytesRead = source.read(outbuf);
-            if (bytesRead == -1) {
-                encoder.complete();
-            } else {
-                outbuf.flip();
-                encoder.write(outbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesSent(outbuf.position());
-                }
-                outbuf.compact();
+            int bytesWritten = outBuf.produceContent(encoder);
+            if (metrics != null && bytesWritten > 0) {
+                metrics.incrementBytesSent(bytesWritten);
             }
 
             if (encoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesSent();
                 }
-                source.close();
             }
 
         } catch (IOException e) {
@@ -556,25 +528,20 @@
     private void processResponse(final NHttpClientConnection conn, HttpContext context,
         HttpResponse response) {
 
-        try {
-            PipeImpl responsePipe = new PipeImpl();
-            context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
-            context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
-
-            BasicHttpEntity entity = new BasicHttpEntity();
-            if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
-                entity.setChunked(true);
-            }
-            response.setEntity(entity);
-            context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
-
-            workerPool.execute(
-                new ClientWorker(cfgCtx, Channels.newInputStream(responsePipe.source()), response,
-                    (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+        ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+        context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
+
+        BasicHttpEntity entity = new BasicHttpEntity();
+        if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
+            entity.setChunked(true);
+        }
+        response.setEntity(entity);
+        context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
+
+        workerPool.execute(
+            new ClientWorker(cfgCtx, new ContentInputStream(inputBuffer), response,
+                (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
 
-        } catch (IOException e) {
-            handleException("I/O Error : " + e.getMessage(), e, conn);
-        }
     }
 
     // ----------- utility methods -----------

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java Mon May 26 01:01:04 2008
@@ -20,6 +20,7 @@
 
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
 import org.apache.http.HttpHost;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,6 +60,7 @@
                             log.debug("A connection to host : " + host + " on port : " +
                                 port + " is available in the pool, and will be reused");
                         }
+                        conn.requestInput(); // asankha - make sure keep alives work properly when reused with throttling
                         return conn;
                     } else {
                         if (log.isDebugEnabled()) {
@@ -93,6 +95,7 @@
             }
         }
 
+        cleanConnectionReferences(conn);
         connections.add(conn);
 
         if (log.isDebugEnabled()) {
@@ -100,4 +103,37 @@
                     host.getPort() + " to the connection pool of current size : " + connections.size());
         }
     }
+
+    private static void cleanConnectionReferences(NHttpClientConnection conn) {
+
+        HttpContext ctx = conn.getContext();        
+        Axis2HttpRequest axis2Req =
+            (Axis2HttpRequest) ctx.getAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+        axis2Req.clear();   // this is linked via the selection key attachment and will free itself
+                            // on timeout of the keep alive connection. Till then minimize the
+                            // memory usage to a few bytes 
+
+        ctx.removeAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+        ctx.removeAttribute(ClientHandler.OUTGOING_MESSAGE_CONTEXT);
+        ctx.removeAttribute(ClientHandler.REQUEST_SOURCE_BUFFER);
+        ctx.removeAttribute(ClientHandler.RESPONSE_SINK_BUFFER);
+
+        ctx.removeAttribute(ExecutionContext.HTTP_REQUEST);
+        ctx.removeAttribute(ExecutionContext.HTTP_RESPONSE);
+        ctx.removeAttribute(ExecutionContext.HTTP_CONNECTION);
+    }
+
+    public static void forget(NHttpClientConnection conn) {
+
+        HttpHost host = (HttpHost) conn.getContext().getAttribute(
+            ExecutionContext.HTTP_TARGET_HOST);
+        String key = host.getHostName() + ":" + Integer.toString(host.getPort());
+
+        List connections = (List) connMap.get(key);
+        if (connections != null) {
+            synchronized(connections) {
+                connections.remove(conn);
+            }
+        }
+    }
 }

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java Mon May 26 01:01:04 2008
@@ -38,7 +38,7 @@
     private static final int WORKER_KEEP_ALIVE     = 5;
     private static final int BLOCKING_QUEUE_LENGTH = -1;
     private static final int IO_WORKER_COUNT = 2;
-    private static final int BUFFER_SIZE           = 2048;
+    private static final int BUFFER_SIZE           = 8192;
 
     // server listener
     private static final String S_T_CORE     = "snd_t_core";

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=660119&r1=660118&r2=660119&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Mon May 26 01:01:04 2008
@@ -19,10 +19,10 @@
 package org.apache.synapse.transport.nhttp;
 
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
 import org.apache.synapse.transport.base.MetricsCollector;
 import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
 import org.apache.synapse.transport.base.threads.WorkerPool;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.entity.ByteArrayEntity;
@@ -32,6 +32,10 @@
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.NHttpServerConnection;
 import org.apache.http.nio.NHttpServiceHandler;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.entity.ContentOutputStream;
+import org.apache.http.nio.util.*;
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.*;
 import org.apache.http.util.EncodingUtils;
@@ -39,11 +43,6 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.Channel;
 
 /**
  * The server connection handler. An instance of this class is used by each IOReactor, to
@@ -62,6 +61,8 @@
     private final HttpProcessor httpProcessor;
     /** the strategy to re-use connections */
     private final ConnectionReuseStrategy connStrategy;
+    /** the buffer allocator */
+    private final ByteBufferAllocator allocator;
 
     /** the Axis2 configuration context */
     ConfigurationContext cfgCtx = null;
@@ -75,12 +76,8 @@
     /** the metrics collector */
     private MetricsCollector metrics = null;
 
-    private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
-    private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
-    private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
-    private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
-    private static final String REQUEST_BUFFER = "request-buffer";
-    private static final String RESPONSE_BUFFER = "response-buffer";
+    public static final String REQUEST_SINK_BUFFER = "synapse.request-sink-buffer";
+    public static final String RESPONSE_SOURCE_BUFFER = "synapse.response-source-buffer";
 
     public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams params,
         final boolean isHttps, final MetricsCollector metrics) {
@@ -92,6 +89,7 @@
         this.responseFactory = new DefaultHttpResponseFactory();
         this.httpProcessor = getHttpProcessor();
         this.connStrategy = new DefaultConnectionReuseStrategy();
+        this.allocator = new HeapByteBufferAllocator();
 
         this.cfg = NHttpConfiguration.getInstance();
         this.workerPool = WorkerPoolFactory.getWorkerPool(
@@ -112,17 +110,11 @@
         HttpRequest request = conn.getHttpRequest();
         context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
 
-        // allocate temporary buffers to process this request
-        context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-        context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
         try {
-            PipeImpl requestPipe  = new PipeImpl(); // the pipe used to process the request
-            PipeImpl responsePipe = new PipeImpl(); // the pipe used to process the response
-            context.setAttribute(REQUEST_SINK_CHANNEL, requestPipe.sink());
-            context.setAttribute(REQUEST_SOURCE_CHANNEL, requestPipe.source());
-            context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
-            context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
+            ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+            ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+            context.setAttribute(REQUEST_SINK_BUFFER, inputBuffer);
+            context.setAttribute(RESPONSE_SOURCE_BUFFER, outputBuffer);
 
             // create the default response to this request
             ProtocolVersion httpVersion = request.getRequestLine().getProtocolVersion();
@@ -132,7 +124,6 @@
 
             // create a basic HttpEntity using the source channel of the response pipe
             BasicHttpEntity entity = new BasicHttpEntity();
-            entity.setContent(Channels.newInputStream(responsePipe.source()));
             if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
                 entity.setChunked(true);
             }
@@ -141,12 +132,9 @@
             // hand off processing of the request to a thread off the pool
             workerPool.execute(
                 new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
-                    request, Channels.newInputStream(requestPipe.source()),
-                    response, Channels.newOutputStream(responsePipe.sink())));
+                    request, new ContentInputStream(inputBuffer),
+                    response, new ContentOutputStream(outputBuffer)));
 
-        } catch (IOException e) {
-            handleException("Error processing request received for : " +
-                request.getRequestLine().getUri(), e, conn);
         } catch (Exception e) {
             handleException("Error processing request received for : " +
                 request.getRequestLine().getUri(), e, conn);
@@ -161,24 +149,18 @@
     public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
 
         HttpContext context = conn.getContext();
-        WritableByteChannel sink = (WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL);
-        ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+        ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(REQUEST_SINK_BUFFER);
 
         try {
-            while (decoder.read(inbuf) > 0) {
-                inbuf.flip();
-                sink.write(inbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesReceived(inbuf.position());
-                }
-                inbuf.compact();
+            int bytesRead = inBuf.consumeContent(decoder);
+            if (metrics != null && bytesRead > 0) {
+                metrics.incrementBytesReceived(bytesRead);
             }
 
             if (decoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesReceived();
                 }
-                sink.close();
             }
 
         } catch (IOException e) {
@@ -195,29 +177,22 @@
 
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
-        ReadableByteChannel source = (ReadableByteChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL);
-        ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+        ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(RESPONSE_SOURCE_BUFFER);
 
         try {
-            int bytesRead = source.read(outbuf);
-            if (bytesRead == -1 && outbuf.position() == 0) {
-                encoder.complete();
-            } else {
-                outbuf.flip();
-                encoder.write(outbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesSent(outbuf.position());
-                }
-                outbuf.compact();
+            int bytesWritten = outBuf.produceContent(encoder);
+            if (metrics != null && bytesWritten > 0) {
+                metrics.incrementBytesSent(bytesWritten);
             }
 
             if (encoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesSent();
                 }
-                source.close();
                 if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
+                } else {
+                    conn.requestInput();
                 }
             }
 
@@ -279,29 +254,15 @@
 
     public void closed(final NHttpServerConnection conn) {
 
-        // Check sink and source channels and close them if they aren't closed already.
-        // Normally these should be closed by inputReady() and outputReady(). A null request
-        // or response will not hit inputReady and outputReady however.
-
         HttpContext context = conn.getContext();
-        closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
-        closeChannel((ReadableByteChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL));
-        closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
-        closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
+        context.removeAttribute(REQUEST_SINK_BUFFER);
+        context.removeAttribute(RESPONSE_SOURCE_BUFFER);
 
         if (log.isTraceEnabled()) {
             log.trace("Connection closed");
         }
     }
 
-    private void closeChannel(Channel chn) {
-        try {
-            if (chn != null && chn.isOpen()) {
-                chn.close();
-            }
-        } catch (IOException ignore) {}
-    }
-
     /**
      * Handle HTTP Protocol violations with an error response
      * @param conn the connection being processed

Added: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java?rev=660119&view=auto
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java (added)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedInputBuffer.java Mon May 26 01:01:04 2008
@@ -0,0 +1,184 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.transport.nhttp.util;
+
+import org.apache.http.nio.util.ExpandableBuffer;
+import org.apache.http.nio.util.ContentInputBuffer;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.ContentDecoder;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+/**
+ * A copy of the SharedInputBuffer implementation of Apache HttpComponents - HttpCore/NIO
+ * found at http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/
+ *  src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
+ *
+ * To include the fix described here : http://svn.apache.org/viewvc/httpcomponents/httpcore/
+ *  trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
+ *  ?view=diff&r1=659956&r2=659957&pathrev=659957
+ * with the HttpCore version 4.0-beta1
+ *
+ * TODO : This class to be removed as soon as we update the HttpCore dependency from 4.0-beta1
+ */
+public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
+
+    private final IOControl ioctrl;
+    private final Object mutex;
+
+    private volatile boolean shutdown = false;
+    private volatile boolean endOfStream = false;
+
+    public SharedInputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
+        super(buffersize, allocator);
+        if (ioctrl == null) {
+            throw new IllegalArgumentException("I/O content control may not be null");
+        }
+        this.ioctrl = ioctrl;
+        this.mutex = new Object();
+    }
+
+    public void reset() {
+        if (this.shutdown) {
+            return;
+        }
+        synchronized (this.mutex) {
+            clear();
+            this.endOfStream = false;
+        }
+    }
+
+    public int consumeContent(final ContentDecoder decoder) throws IOException {
+        if (this.shutdown) {
+            return -1;
+        }
+        synchronized (this.mutex) {
+            setInputMode();
+            int totalRead = 0;
+            int bytesRead;
+            while ((bytesRead = decoder.read(this.buffer)) > 0) {
+                totalRead += bytesRead;
+            }
+            if (bytesRead == -1 || decoder.isCompleted()) {
+                this.endOfStream = true;
+            }
+            if (!this.buffer.hasRemaining()) {
+                this.ioctrl.suspendInput();
+            }
+            this.mutex.notifyAll();
+
+            if (totalRead > 0) {
+                return totalRead;
+            } else {
+                if (this.endOfStream) {
+                    return -1;
+                } else {
+                    return 0;
+                }
+            }
+        }
+    }
+
+    protected void waitForData() throws IOException {
+        synchronized (this.mutex) {
+            try {
+                while (!hasData() && !this.endOfStream) {
+                    if (this.shutdown) {
+                        throw new InterruptedIOException("Input operation aborted");
+                    }
+                    this.ioctrl.requestInput();
+                    this.mutex.wait();
+                }
+            } catch (InterruptedException ex) {
+                throw new IOException("Interrupted while waiting for more data");
+            }
+        }
+    }
+
+    public void shutdown() {
+        if (this.shutdown) {
+            return;
+        }
+        this.shutdown = true;
+        synchronized (this.mutex) {
+            this.mutex.notifyAll();
+        }
+    }
+
+    protected boolean isShutdown() {
+        return this.shutdown;
+    }
+
+    protected boolean isEndOfStream() {
+        return this.shutdown || (!hasData() && this.endOfStream);
+    }
+
+    public int read() throws IOException {
+        if (this.shutdown) {
+            return -1;
+        }
+        synchronized (this.mutex) {
+            if (!hasData()) {
+                waitForData();
+            }
+            if (isEndOfStream()) {
+                return -1;
+            }
+            return this.buffer.get() & 0xff;
+        }
+    }
+
+    public int read(final byte[] b, int off, int len) throws IOException {
+        if (this.shutdown) {
+            return -1;
+        }
+        if (b == null) {
+            return 0;
+        }
+        synchronized (this.mutex) {
+            if (!hasData()) {
+                waitForData();
+            }
+            if (isEndOfStream()) {
+                return -1;
+            }
+            setOutputMode();
+            int chunk = len;
+            if (chunk > this.buffer.remaining()) {
+                chunk = this.buffer.remaining();
+            }
+            this.buffer.get(b, off, chunk);
+            return chunk;
+        }
+    }
+
+    public int read(final byte[] b) throws IOException {
+        if (this.shutdown) {
+            return -1;
+        }
+        if (b == null) {
+            return 0;
+        }
+        return read(b, 0, b.length);
+    }
+
+}