You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2006/10/25 19:50:44 UTC

svn commit: r467721 - in /incubator/synapse/branches/NIO/modules/niohttp/src: ./ org/apache/axis2/transport/niohttp/impl/

Author: asankha
Date: Wed Oct 25 10:50:44 2006
New Revision: 467721

URL: http://svn.apache.org/viewvc?view=rev&rev=467721
Log:
clean and refactor IOHandlers

Added:
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java
Modified:
    incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
    incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties Wed Oct 25 10:50:44 2006
@@ -3,7 +3,7 @@
 # Set the level to DEBUG if you want to log all SlideExceptions (some of them aren't errors)
 #log4j.category.org.apache.axis2=DEBUG
 log4j.category.org.apache.synapse=DEBUG
-log4j.category.org.apache.axis2.transport.niohttp=DEBUG
+log4j.category.org.apache.axis2.transport.niohttp=TRACE
 
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Added: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java?view=auto&rev=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java (added)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java Wed Oct 25 10:50:44 2006
@@ -0,0 +1,33 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public abstract class AbstractIOHandler implements IOHandler {
+    
+    private boolean beingProcessed = false;
+
+    public boolean isBeingProcessed() {
+        return beingProcessed;
+    }
+
+    public synchronized void lock() {
+        beingProcessed = true;
+    }
+
+    public synchronized void unlock() {
+        beingProcessed = false;
+    }
+}

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java Wed Oct 25 10:50:44 2006
@@ -24,7 +24,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-public abstract class GenericIOHandler implements Runnable {
+public abstract class GenericIOHandler extends AbstractIOHandler {
 
     private static final Log log = LogFactory.getLog(GenericIOHandler.class);
 
@@ -32,82 +32,95 @@
     protected SocketChannel socket;
 
     protected MessageWriter msgWriter;
-    protected HttpService httpService;
+    protected MessageReader msgReader;
+    protected HttpService   httpService;
 
     protected ByteBuffer nwReadBuffer   = ByteBuffer.allocate(4096);
     protected ByteBuffer nwWriteBuffer  = ByteBuffer.allocate(4096);
     protected ByteBuffer appReadBuffer  = ByteBuffer.allocate(4096);
     protected ByteBuffer appWriteBuffer = ByteBuffer.allocate(4096);
+
     protected int nwReadPos, nwWritePos;
     protected int appReadPos, appWritePos;
 
-    private boolean beingProcessed = false;
+    /** acp
+     * Process a ready read event, and reject it if we are unable to process.
+     * If content was read in, then it would be processed as well - ie. fire event
+     * for received message header, or stream in content received
+     */
+    protected void processReadyRead() {
+
+        if (msgReader.availableForWrite() == 0) {
+            log.trace("reject available read as message reader is full");
+            return;
+        }
 
-    public boolean isBeingProcessed() {
-        return beingProcessed;
-    }
+        log.trace("attempting to read into NW a maximum of "
+            + msgReader.availableForWrite() + " bytes");
 
-    public synchronized void lock() {
-        beingProcessed = true;
-    }
+        if (readFromPhysicalNetwork() > 0) {
 
-    public synchronized void unlock() {
-        beingProcessed = false;
-    }
+            if (log.isTraceEnabled()) {
+                log.trace("read into NW buffer : \n" +
+                    Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
+            }
 
-    public GenericIOHandler() {}
+            log.trace("attempt to read NW buffer into App buffer");
+            readFromNetworkBuffer();
 
-    protected int readNetworkBuffer(int maxBytes) {
+            if (log.isTraceEnabled()) {
+                log.trace("read into App buffer : \n" +
+                    Util.dumpAsHex(appReadBuffer.array(), appReadPos));
+            }
 
-        try {
-            // set position within buffer to read from channel
-            nwReadBuffer.position(nwReadPos);
+            log.trace("attempt to process App buffer contents");
+            processAppReadBuffer();
+        }
+    }
 
-            if (nwReadPos == nwReadBuffer.capacity()) {
-                ByteBuffer newBuf = ByteBuffer.allocate(nwReadBuffer.capacity() * 2);
-                log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
-                nwReadBuffer.flip();
-                nwReadBuffer = newBuf.put(nwReadBuffer);
-            }
-
-            // perform read from channel to this location
-            // *** Read a maximum of maxBytes ***
-            ByteBuffer temp = ByteBuffer.allocate(maxBytes);
-            int bytesRead = socket.read(temp);            
-            //System.out.println("Read : " + maxBytes + " bytes\n" + Util.dumpAsHex(temp.array(), bytesRead));
-            temp.flip();
-            nwReadBuffer.put(temp);
-
-            if (bytesRead != -1) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Read from socket to buffer position: " + nwReadPos + " to: " + nwReadBuffer.position());
-                    log.debug(Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
-                }
+    /** acp
+     * Read from the physical NW until the NW read buffer is filled up
+     * @return the number of bytes actually read
+     */
+    protected int readFromPhysicalNetwork() {
 
-                // save position for next read
-                nwReadPos = nwReadBuffer.position();
-                return bytesRead;
+        log.debug("attempting to read from the physical NW");
 
-            } else {
-                // end of stream reached
-                log.info("end-of-stream detected and socket closed");
-                socket.close();
-                sk.cancel();
-            }
+        // set position within buffer to read from channel
+        nwReadBuffer.position(nwReadPos);
+        int bytesRead = 0;
 
+        try {
+            bytesRead = socket.read(nwReadBuffer);
         } catch (IOException e) {
-            log.warn(e.getMessage() + " Closing socket: " + socket);
-            try {
-                socket.close();
-            } catch (IOException e1) {
-                e1.printStackTrace();
+            handleException("Error reading into NW buffer from socket : " + e.getMessage(), e);
+        }
+
+        if (bytesRead >= 0) {
+            log.debug("read " + bytesRead + " bytes from the physical NW");
+            nwReadPos += bytesRead;
+            if (log.isTraceEnabled() && bytesRead > 0) {
+                log.trace("current nwReadBuffer\n" +
+                    Util.dumpAsHex(nwReadBuffer.array(), bytesRead));
             }
+            return bytesRead;
+
+        } else {
+            // end of stream reached
+            log.debug("end-of-stream detected and socket closed");
             sk.cancel();
+            try {
+                socket.close();
+            } catch (IOException e) {}
+
         }
         return 0;
     }
 
-    protected void readApplicationBuffer() {
+    /** acp
+     * Read from the NW buffer into the App buffer, the complete NW buffer contents
+     */
+    protected void readFromNetworkBuffer() {
 
         nwReadBuffer.position(nwReadPos);
         nwReadBuffer.flip();
@@ -115,93 +128,95 @@
         appReadBuffer.position(appReadPos);
         while (appReadBuffer.remaining() < nwReadBuffer.limit()) {
             ByteBuffer newBuf = ByteBuffer.allocate(appReadBuffer.capacity() * 2);
-            log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+            log.debug("expanding appReadBuffer to " + newBuf.capacity() + " bytes");
             appReadBuffer.flip();
             appReadBuffer = newBuf.put(appReadBuffer);
         }
 
         appReadBuffer.put(nwReadBuffer);
         appReadPos = appReadBuffer.position();
-        appReadBuffer.flip();        
-        
+        appReadBuffer.flip();
+
         nwReadBuffer.clear();
         nwReadPos = 0;
     }
-
-    protected void writeNetworkBuffer() {
-        nwWriteBuffer.position(nwWritePos);
-        if (nwWritePos > 0) {
-            nwWriteBuffer.compact();
-        }
-
-        appWriteBuffer.position(appWritePos);
-        appWriteBuffer.flip();
-
-        while (nwWriteBuffer.remaining() < appWriteBuffer.limit()) {
-            ByteBuffer newBuf = ByteBuffer.allocate(nwWriteBuffer.capacity() * 2);
-            log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
-            nwWriteBuffer.flip();
-            nwWriteBuffer = newBuf.put(nwWriteBuffer);
+    
+    /** acp
+     * The Incoming and Outgoing Handlers should implement this method, and provide
+     * thier custom implementations to handle received http messages
+     */
+    abstract protected void processAppReadBuffer();
+
+    /** acp
+     * Process a ready write event
+     */
+    protected void processReadyWrite(boolean closeConnectionIfDone) {
+
+        log.debug("attempt to write to the App buffer from the message writer");
+        writeToApplicationBuffer();
+
+        log.debug("attempt to write to the NW buffer from the App buffer");
+        writeToNetworkBuffer();
+
+        log.debug("attempt to write to the physical NW from the NW buffer");
+        writeToPhysicalNetwork();
+
+        if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
+            // if both our App & NW buffers are empty, and body stream has ended
+            log.debug("message written completely to the wire");
+            if (closeConnectionIfDone && msgWriter.isConnectionClose()) {
+                log.debug("closing connection normally as connection-close requested");
+                sk.cancel();
+                try {
+                    socket.close();
+                } catch (IOException e) {}
+            } else {
+                // response has been written completely
+                // now read response or result code
+                sk.interestOps(SelectionKey.OP_READ);
+            }
         }
-
-        nwWriteBuffer.put(appWriteBuffer);
-        nwWritePos = nwWriteBuffer.position();
-        appWritePos = 0;
-
-        System.out.println("nwWriteBuffer : \n" + Util.dumpAsHex(nwWriteBuffer.array(), nwWritePos));
     }
 
-    protected void writeToNetwork() {
-        // write as much as we can
-        nwWriteBuffer.position(nwWritePos);
-        nwWriteBuffer.flip();
-        int total = 0;
-        int write = 0;
-        do {
-            try {
-                write = socket.write(nwWriteBuffer);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            if (write > 0) {
-                total += write;
-            }
-        } while (write > 0);
-
-        // compact the left overs
-        nwWriteBuffer.compact();
-        nwWritePos = nwWriteBuffer.position();
-    }
+    /** acp
+     * Reads from the msgWriter and writes to the application buffer
+     * If the message header has not been read, the complete header is read (which
+     * is known not to block) and written at once, expanding the application buffer
+     * as required. If the header has been written, the msgWriter would be in the
+     * body-streaming mode, in which case, it is attempted to read until the application
+     * buffer is filled up - or the end of stream is reached
+     */
+    protected void writeToApplicationBuffer() {
 
-    protected void writeApplicationBuffer() {
         if (!msgWriter.isStreamingBody()) {
+            log.debug("attempting to write message header into App buffer");
             byte[] header = msgWriter.getMessageHeader();
 
+            appWriteBuffer.position(appWritePos);
             while (appWriteBuffer.remaining() < header.length) {
                 ByteBuffer newBuf = ByteBuffer.allocate(appWriteBuffer.capacity() * 2);
-                log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+                log.debug("expanding App buffer to " + newBuf.capacity() + " bytes");
                 appWriteBuffer.flip();
                 appWriteBuffer = newBuf.put(appWriteBuffer);
             }
 
             appWriteBuffer.put(header);
             appWritePos = appWriteBuffer.position();
-            msgWriter.setStreamingBody(true);
+            msgWriter.setStreamingBody(true);   // switch into body streaming mode
 
         } else {
 
-            // fill the appWriteBuffer from the message body
+            log.debug("attempting to write from message body into App buffer");
+
             InputStream is = msgWriter.getInputStream();
             appWriteBuffer.position(appWritePos);
-            if (appWritePos > 0) {
-                appWriteBuffer.compact();
-            }
 
             try {
                 while (appWriteBuffer.remaining() > 0) {
                     int c = is.read();
                     if (c == -1) {
                         msgWriter.setStreamingBody(false);
+                        log.debug("end of message body stream detected");
                         break;
                     }
                     appWriteBuffer.put((byte) c);
@@ -209,11 +224,88 @@
 
                 appWritePos = appWriteBuffer.position();
 
-                System.out.println("appWriteBuffer : \n" + Util.dumpAsHex(appWriteBuffer.array(), appWritePos));
+            } catch (IOException e) {
+                handleException("Error reading message body stream : " + e.getMessage(), e);
+            }
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("current appWriteBuffer : \n" +
+                Util.dumpAsHex(appWriteBuffer.array(), appWritePos));
+        }
+    }
+
+    /** acp
+     * Write all of the contents of the App buffer into the NW buffer, expanding
+     * it as required
+     * TODO handle SSL later
+     */
+    protected void writeToNetworkBuffer() {
+
+        appWriteBuffer.position(appWritePos);
+        appWriteBuffer.flip();
+
+        // expand NW buffer to hold App buffer contents
+        while (nwWriteBuffer.remaining() < appWriteBuffer.limit()) {
+            ByteBuffer newBuf = ByteBuffer.allocate(nwWriteBuffer.capacity() * 2);
+            log.debug("expanding NW buffer to " + newBuf.capacity() + " bytes");
+            nwWriteBuffer.flip();
+            nwWriteBuffer = newBuf.put(nwWriteBuffer);
+        }
+
+        nwWriteBuffer.position(nwWritePos);
+        nwWriteBuffer.put(appWriteBuffer);
+        nwWritePos = nwWriteBuffer.position();
 
+        // clear App buffer as we wrote all of it to the NW buffer
+        appWritePos = 0;
+        appWriteBuffer.clear();
+
+        if (log.isTraceEnabled()) {
+            log.trace("current nwWriteBuffer : \n" +
+                Util.dumpAsHex(nwWriteBuffer.array(), nwWritePos));
+        }
+    }
+
+    /** acp
+     * Write the contents of the NW buffer into the physical socket
+     */
+    protected void writeToPhysicalNetwork() {
+        // write as much as we can from our NW buffer, without blocking of course!
+        nwWriteBuffer.position(nwWritePos);
+        nwWriteBuffer.flip();
+        int write = 0;
+        do {
+            try {
+                write = socket.write(nwWriteBuffer);
             } catch (IOException e) {
-                e.printStackTrace();
+                handleException("Error writing to socket : " + e.getMessage(), e);
             }
+        } while (write > 0 && nwWriteBuffer.remaining() > 0);
+
+        // compact any left overs and capture new position
+        nwWriteBuffer.compact();
+        nwWritePos = nwWriteBuffer.position();
+    }
+
+    /**
+     * Handle an exception encountered. TODO need to streamline error handling and cleanup
+     * @param msg a message indicating the exception
+     * @param e the Exception thrown
+     */
+    protected void handleException(String msg, Exception e) {
+        log.error(msg, e);
+
+        if (sk.isValid()) {
+            sk.cancel();
         }
+        try {
+            if (socket != null && socket.isOpen()) {
+                socket.close();
+            }
+        } catch (IOException e1) {}
+
+        //TODO throw new ();
     }
+    
 }

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java Wed Oct 25 10:50:44 2006
@@ -20,86 +20,13 @@
 import java.io.IOException;
 import java.util.Date;
 
-public class IOHandler {
+public interface IOHandler extends Runnable {
 
-    class Message {
-        Message(String msg) {
-            write(msg.getBytes());
-        }
+    public boolean isBeingProcessed();
 
-        private ByteBuffer buffer = ByteBuffer.allocate(1024);
+    public void lock();
 
-        public InputStream getInputStream() {
-            buffer.position(0);
-            return new InputStream() {
-                public synchronized int read() throws IOException {
-                    if (!buffer.hasRemaining()) {
-                        return -1;
-                    }
-                    return buffer.get();
-                }
-                public synchronized int read(byte[] bytes, int off, int len) throws IOException {
-                    len = Math.min(len, buffer.remaining());
-                    buffer.get(bytes, off, len);
-                    return len;
-                }
-            };
-        }
+    public void unlock();
 
-        private int writePos;
-
-        public void write(byte[] data) {
-            buffer.position(writePos);
-            buffer.put(data);
-            writePos = buffer.position();
-        }
-    }
-
-    class B implements Runnable {
-        Message m;
-        B(Message m) {
-            this.m = m;
-        }
-        public void run() {
-            try {
-                while (true) {
-                    Thread.sleep(1000);
-                    m.write(("Message : " + new Date()).getBytes());
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    class C implements Runnable {
-        InputStream is = null;
-        byte[] data = new byte[1024];
-        C(InputStream is) {
-            this.is = is;
-        }
-        public void run() {
-            try {
-                while (true) {
-                    Thread.sleep(1000);
-                    int r = is.read(data);
-                    if (r > 0) {
-                        System.out.println("data : \n" + Util.dumpAsHex(data, r));
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public void test() {
-        Message m = new Message("Hello World");
-        new Thread(new B(m)).start();
-        new Thread(new C(m.getInputStream())).start();
-    }
-
-    public static void main(String[] args) {
-        new IOHandler().test();
-    }
+    public void run();
 }

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java Wed Oct 25 10:50:44 2006
@@ -38,15 +38,13 @@
 
     private static final Log log = LogFactory.getLog(IncomingHandler.class);
 
-    private MessageReader msgReader = new MessageReader(true);
-
     public IncomingHandler(SocketChannel socket, Selector selector,
         HttpService httpService) throws IOException {
 
+        this.msgReader = new MessageReader(true); // set up in 'request' parse mode
         this.httpService = httpService;
         this.socket = socket;
-        socket.configureBlocking(false);
-        // Optionally try first read now
+        this.socket.configureBlocking(false);
         sk = socket.register(selector, 0);
         sk.attach(this);
         sk.interestOps(SelectionKey.OP_READ);   // we are only interested to read
@@ -54,87 +52,43 @@
     }
 
     public void setResponse(HttpResponse response) {
-        // TODO writeHandler.setMessage(response.getWireBuffer(), response.isConnectionClose());
         sk.interestOps(SelectionKey.OP_WRITE);
         sk.selector().wakeup();
-        log.debug("\tIncomingHandler.setResponse()");
     }
 
-    /**
-     * The main handler routine
+    /** acp
+     * The main handler routine for incoming requests and responses
      */
     public void run() {
 
-        if (sk.isReadable()) {
-            log.debug("\tIncomingHandler run() - READABLE");
-            
-            if (msgReader.availableForWrite() == 0) {
-                //System.out.println("Reject read");
-                return;
-            } else {
-                //System.out.println("Accept read");
-            }
-
-            if (readNetworkBuffer(msgReader.availableForWrite()) > 0) {
-                //System.out.println("NW Buffer read : \n" + Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
-                readApplicationBuffer();
-
-                //System.out.println(Thread.currentThread().getName() + " Processing App Buffer : \n" + Util.dumpAsHex(appReadBuffer.array(), appReadPos));
-                processAppReadBuffer();
-            }
-
-        } else if (sk.isWritable()) {
-            log.debug("\tIncomingHandler run() - WRITEABLE");
-
-            log.debug("\tIncomingHandler run() - WRITEABLE");
-
-            writeApplicationBuffer();
-            writeNetworkBuffer();
-            writeToNetwork();
-
-            if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
-                log.debug("\tRequest written completely");
-                if (msgWriter.isConnectionClose()) {
-                    log.debug("\tClosing connection normally");
-                    sk.cancel();
-                    try {
-                        socket.close();
-                    } catch (IOException e) {
-                        log.warn("Error during socket close : " + e.getMessage(), e);
-                    }
-                } else {
-                    // response has been written completely
-                    // now read response or at least result code
-                    sk.interestOps(SelectionKey.OP_READ);
-                }
+        try {
+            if (sk.isReadable()) {
+                log.debug("readable");
+                processReadyRead();
+
+            } else if (sk.isWritable()) {
+                log.debug("writable");
+                processReadyWrite(true);
             }
 
-            /*if (writeHandler.handle(socket)) {
-                log.debug("\tThe response has been written completely");
-                // response has been written completely
-                if (writeHandler.isConnectionClose()) {
-                    log.debug("\tClosing connection normally");
-                    sk.cancel();
-                    try {
-                        socket.close();
-                    } catch (IOException e) {
-                        log.warn("Error during socket close : " + e.getMessage(), e);
-                    }
-                } else {
-                    // now we are again interested to read
-                    sk.interestOps(SelectionKey.OP_READ);
-                }
-            }*/
-        } else {
-            log.warn("IncomingHandler run(!!!unknown event!!!) : " + sk.readyOps());
+        } finally{
+            if (isBeingProcessed())
+                unlock();
         }
     }
 
-    private void processAppReadBuffer() {
+    /** acp
+     * Process the App buffer which has been read. If we just read the message
+     * header right now, then this method fires the handleRequest() call on the
+     * http service passing. However, at this point, the body may not have been
+     * read completely.
+     */
+    protected void processAppReadBuffer() {
         boolean readHeader = msgReader.isStreamingBody();
 
         try {
             int pos = msgReader.process(appReadBuffer);
+
             // if the handler digested any bytes, discard and compact the buffer
             if (pos > 0) {
                 appReadBuffer.position(pos);
@@ -146,18 +100,14 @@
             if (!readHeader && msgReader.isStreamingBody()) {
                 HttpRequest request = (HttpRequest) msgReader.getHttpMessage();
                 request.setHandler(this);
-                log.debug("\tFire event for received HttpRequest");
+                log.debug("fire event for received HttpResponse");
                 httpService.handleRequest(request);
             }
 
-            /*socket.close();
-            sk.cancel();
-            log.debug("Socket closed and SelectionKey cancelled");*/
-
         } catch (IOException e) {
-            e.printStackTrace();
+            handleException("Error piping the received App buffer : " + e. getMessage(), e);
         } catch (NHttpException e) {
-            e.printStackTrace();
+            handleException(e.getMessage(), e);
         }
     }
 

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java Wed Oct 25 10:50:44 2006
@@ -25,6 +25,7 @@
 import java.nio.ByteBuffer;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.StringTokenizer;
 
 /**
  * TODO
@@ -156,11 +157,19 @@
             // this effectively skips any blank lines
             messageLine = getNextLine(buffer);
         }
-        String[] parts = messageLine.split("\\s");
-        if (parts.length != 3) {
-            throw new NHttpException("Invalid " +
-                (requestMode ? "request" : "response") + " line : " + messageLine);
+
+        String[] parts = new String[3];
+        for (int i=0; i<2; i++) {
+            int sep = messageLine.indexOf(' ');
+            if (sep == -1) {
+                throw new NHttpException("Invalid " +
+                    (requestMode ? "request" : "response") + " line : " + messageLine);
+            } else {
+                parts[i] = messageLine.substring(0, sep);
+                messageLine = messageLine.substring(sep+1);
+            }
         }
+        parts[2] = messageLine;
 
         if (requestMode) {
             HttpRequest req = (HttpRequest) httpMessage;
@@ -295,6 +304,14 @@
      */
     public boolean isStreamingBody() {
         return streamingBody;
+    }
+
+    /**
+     * Should we close the connection?
+     * @return true if we should close the connection
+     */
+    public boolean isConnectionClose() {
+        return httpMessage.isConnectionClose();
     }
 
     /**

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java Wed Oct 25 10:50:44 2006
@@ -19,32 +19,37 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.ClosedChannelException;
 
 /**
- * This handler owns sending of a httpMessage to an external endpoint using a WriteHandler
- * and reading back the response. This does not handle persistent/pipelined connections
+ * This handler owns the asynchronous sending of a httpMessage to an external endpoint
+ * and reading back the response. This does not handle persistent/pipelined connections yet
  */
-public class OutgoingHandler extends GenericIOHandler implements Runnable {
+public class OutgoingHandler extends GenericIOHandler {
 
     private static final Log log = LogFactory.getLog(OutgoingHandler.class);
 
+    /** A runnable callback object that would be invoked once a reply to the message is received */
     private Runnable callback = null;
-    private MessageReader msgReader = new MessageReader(false);
 
-    OutgoingHandler(SocketChannel socket, SelectionKey sk, HttpRequest request, HttpService httpService) {
+    OutgoingHandler(SocketChannel socket, Selector selector, HttpService httpService)
+        throws ClosedChannelException {
         this.httpService = httpService;
         this.socket = socket;
-        this.sk = sk;
-        request.getWireBuffer().position(0);
+        this.msgReader = new MessageReader(false);
+        this.sk = socket.register(selector, SelectionKey.OP_CONNECT);
+        sk.attach(this);
+    }
+
+    public void setRequest(HttpRequest request) {
         if (!request.isChunked()) {
-            request.addHeader(Constants.CONTENT_LENGTH, Integer.toString(request.getBuffer().limit()));
+            request.addHeader(Constants.CONTENT_LENGTH,
+                Integer.toString(request.getBuffer().limit()));
         }
-
         msgWriter = new MessageWriter(true, request);
-        //writeHandler.setMessage(request.getWireBuffer(), true /* connection close */);
     }
 
     public Runnable getCallback() {
@@ -55,57 +60,49 @@
         this.callback = callback;
     }
 
+    /** acp
+     * The main handler routing for outgoing messages and responses
+     */
     public void run() {
         try {
             if (sk.isConnectable() && socket.finishConnect()) {
-                log.debug("\tIncomingHandler run() - CONNECTABLE and CONNECTED");
+                log.debug("socket was connectable and now connected");
                 sk.interestOps(SelectionKey.OP_WRITE);
 
             } else if (sk.isWritable()) {
-                log.debug("\tIncomingHandler run() - WRITEABLE");
-
-                writeApplicationBuffer();
-                writeNetworkBuffer();
-                writeToNetwork();
-
-                if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
-                    log.debug("\tRequest written completely");
-                    // response has been written completely
-                    // now read response or at least result code
-                    sk.interestOps(SelectionKey.OP_READ);
-                }
+                log.debug("writable");
+                processReadyWrite(false);
 
             } else if (sk.isReadable()) {
-                log.debug("\tOutgoingHandler run() - READABLE");
-
-                if (msgReader.availableForWrite() == 0) {
-                    return; // reject read
-                }
-
-                if (readNetworkBuffer(msgReader.availableForWrite()) > 0) {
-                    //System.out.println("NW Buffer read : \n" + Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
-                    readApplicationBuffer();
-                    
-                    //System.out.println(Thread.currentThread().getName() + " Processing App Buffer : \n" + Util.dumpAsHex(appReadBuffer.array(), appReadPos));
-                    processAppReadBuffer();
-                }
+                log.debug("readable");
+                processReadyRead();
             }
-        }
-        catch (IOException e) {
-            log.error("Error in OutGoingHandler : " + e.getMessage(), e);
+            
+        } catch (IOException e) {
+            handleException("Error connecting to socket : " + e.getMessage(), e);
         } finally{
             if (isBeingProcessed())
                 unlock();
         }
     }
 
-    private void processAppReadBuffer() {
+    /** acp
+     * Process the App buffer which has been read. If we just read the message
+     * header right now, then this method fires the handleResponse() callback
+     * to the http service passing the registered callback object as well. However,
+     * at this point, the body may not have been read completely.
+     */
+    protected void processAppReadBuffer() {
+
         boolean readHeader = msgReader.isStreamingBody();
 
         try {
+            // let MessageReader digest what it can. The returned position is where
+            // it digested the buffer, which we can then discard
             int pos = msgReader.process(appReadBuffer);
-            // if the handler digested any bytes, discard and compact the buffer
+
             if (pos > 0) {
+                // if the handler digested any bytes, discard and compact the buffer
                 appReadBuffer.position(pos);
                 appReadBuffer.compact();
                 appReadPos = appReadBuffer.position();
@@ -113,19 +110,27 @@
 
             // if we hadn't read the full header earlier, and read it just now
             if (!readHeader && msgReader.isStreamingBody()) {
-                log.debug("\tFire event for received HttpResponse");
+                log.debug("fire event for received HttpResponse");
                 unlock();
                 httpService.handleResponse((HttpResponse) msgReader.getHttpMessage(), callback);
             }
 
-            /*socket.close();
-            sk.cancel();
-            log.debug("Socket closed and SelectionKey cancelled");*/
+            // if we had read the header before, and finished receiving the streamed
+            // body just now, we want to close the stream and cancel the SK if
+            // a connection close was mentioned
+            if (readHeader && !msgReader.isStreamingBody() && msgReader.isConnectionClose()) {
+                log.debug("closing the connection as the body has been received completely" +
+                    " and the received response indicated a connection close");
+                try {
+                    socket.close();
+                } catch (IOException e) {}
+                sk.cancel();
+            }
 
         } catch (IOException e) {
-            e.printStackTrace();
+            handleException("Error piping the received App buffer : " + e. getMessage(), e);
         } catch (NHttpException e) {
-            e.printStackTrace();
+            handleException(e.getMessage(), e);
         }
     }
 

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java Wed Oct 25 10:50:44 2006
@@ -54,7 +54,7 @@
     /**
      * The maximum number of threads used for the worker thread pool
      */
-    private static final int WORKERS_MAX_THREADS = 100;
+    private static final int WORKERS_MAX_THREADS = 4;
     /**
      * The keep alive time of an idle worker thread
      */
@@ -142,7 +142,7 @@
 
         // create thread pool of workers
         workerPool = new ThreadPoolExecutor(
-            10,
+            4,
             WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
             new LinkedBlockingQueue(),
             new org.apache.axis2.util.threadpool.DefaultThreadFactory(
@@ -181,25 +181,21 @@
      */
     void dispatch(SelectionKey k) {
         Runnable r = (Runnable) k.attachment();
-        if (r != null && r instanceof GenericIOHandler) {
-            GenericIOHandler h = (GenericIOHandler) r;
+        if (r != null && r instanceof IOHandler) {
+            IOHandler h = (IOHandler) r;
             if (!h.isBeingProcessed()) {
                 h.lock();
                 //r.run();
                 workerPool.execute(r);
                 //lfPool.submit(r);
             }
-        /*} else {
-            if (r != null) {
-                workerPool.execute(r);
-            }*/
         }
     }
 
     /**
      * Accepts a new connection and hands it off to a new IncomingHandler instance
      */
-    class Acceptor extends GenericIOHandler implements Runnable {
+    class Acceptor extends AbstractIOHandler {
         private HttpService httpService = null;
 
         public Acceptor(HttpService httpService) {
@@ -221,33 +217,52 @@
         }
     }
 
+    /**
+     * Queue this HttpRequest to be sent to its destination, and invoke the Callback
+     * on recepit of a response
+     * @param request the HttpRequest which is to be sent. The body of this message could
+     *      be streamed before or after the invocation of this method, and the end of the
+     *      body would be determined when the output stream is closed.
+     * @param callback an optional call back to be invoked when a response is received
+     *      for this request
+     */
     public void send(HttpRequest request, Runnable callback) {
+        SocketChannel socket = null;
         try {
             InetSocketAddress addr = new InetSocketAddress(
                 request.getHost(), request.getPort());
 
-            SocketChannel socket = SocketChannel.open();
+            socket = SocketChannel.open();
             socket.configureBlocking(false);
             socket.connect(addr);
 
-            SelectionKey sk = socket.register(selector, SelectionKey.OP_CONNECT);
-            OutgoingHandler outHandler = new OutgoingHandler(socket, sk, request, httpService);
-            if (callback != null) {
-                outHandler.setCallback(callback);
-            }
-            sk.attach(outHandler);
+            OutgoingHandler outHandler = new OutgoingHandler(socket, selector, httpService);
+
+            outHandler.setRequest(request);
+            outHandler.setCallback(callback);
 
         } catch (IOException e) {
-            e.printStackTrace();
+            // close the socket if we opened it, selection key would be cancelled if invoked
+            if (socket != null && socket.isOpen()) {
+                try {
+                    socket.close();
+                } catch (IOException ioe) {}
+            }
+            handleException("IO Exception : " + e.getMessage() +
+                " sending request : " + request + " : " , e);
         }
     }
 
     private static void handleException(String msg, Exception e) {
+        // todo decide how to handle exceptions and cleanup resources etc
         log.error(msg, e);
-        e.printStackTrace(); // TODO
-        // throw new xxxx TODO
+        e.printStackTrace();
     }
 
+    /**
+     * Request to shutdown the reactor
+     * @param shutdownRequested if true, will request the reactor to shutdown
+     */
     public void setShutdownRequested(boolean shutdownRequested) {
         this.shutdownRequested = shutdownRequested;
     }

Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java Wed Oct 25 10:50:44 2006
@@ -81,7 +81,7 @@
             }
         });
         new Thread(r).start();
-        r.send(request, null);
+        //r.send(request, null);
 
         byte[] bodyBytes = body.getBytes();
         int incr = 32;
@@ -91,6 +91,8 @@
         }
         os.flush();
         os.close();
+
+        r.send(request, null);
     }
 
     private void simpleGet() throws IOException {



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org