You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2006/10/25 19:50:44 UTC
svn commit: r467721 - in
/incubator/synapse/branches/NIO/modules/niohttp/src: ./
org/apache/axis2/transport/niohttp/impl/
Author: asankha
Date: Wed Oct 25 10:50:44 2006
New Revision: 467721
URL: http://svn.apache.org/viewvc?view=rev&rev=467721
Log:
clean and refactor IOHandlers
Added:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties Wed Oct 25 10:50:44 2006
@@ -3,7 +3,7 @@
# Set the level to DEBUG if you want to log all SlideExceptions (some of them aren't errors)
#log4j.category.org.apache.axis2=DEBUG
log4j.category.org.apache.synapse=DEBUG
-log4j.category.org.apache.axis2.transport.niohttp=DEBUG
+log4j.category.org.apache.axis2.transport.niohttp=TRACE
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Added: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java?view=auto&rev=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java (added)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/AbstractIOHandler.java Wed Oct 25 10:50:44 2006
@@ -0,0 +1,33 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public abstract class AbstractIOHandler implements IOHandler {
+
+ private boolean beingProcessed = false;
+
+ public boolean isBeingProcessed() {
+ return beingProcessed;
+ }
+
+ public synchronized void lock() {
+ beingProcessed = true;
+ }
+
+ public synchronized void unlock() {
+ beingProcessed = false;
+ }
+}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java Wed Oct 25 10:50:44 2006
@@ -24,7 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
-public abstract class GenericIOHandler implements Runnable {
+public abstract class GenericIOHandler extends AbstractIOHandler {
private static final Log log = LogFactory.getLog(GenericIOHandler.class);
@@ -32,82 +32,95 @@
protected SocketChannel socket;
protected MessageWriter msgWriter;
- protected HttpService httpService;
+ protected MessageReader msgReader;
+ protected HttpService httpService;
protected ByteBuffer nwReadBuffer = ByteBuffer.allocate(4096);
protected ByteBuffer nwWriteBuffer = ByteBuffer.allocate(4096);
protected ByteBuffer appReadBuffer = ByteBuffer.allocate(4096);
protected ByteBuffer appWriteBuffer = ByteBuffer.allocate(4096);
+
protected int nwReadPos, nwWritePos;
protected int appReadPos, appWritePos;
- private boolean beingProcessed = false;
+ /** acp
+ * Process a ready read event, and reject it if we are unable to process.
+ * If content was read in, then it would be processed as well - ie. fire event
+ * for received message header, or stream in content received
+ */
+ protected void processReadyRead() {
+
+ if (msgReader.availableForWrite() == 0) {
+ log.trace("reject available read as message reader is full");
+ return;
+ }
- public boolean isBeingProcessed() {
- return beingProcessed;
- }
+ log.trace("attempting to read into NW a maximum of "
+ + msgReader.availableForWrite() + " bytes");
- public synchronized void lock() {
- beingProcessed = true;
- }
+ if (readFromPhysicalNetwork() > 0) {
- public synchronized void unlock() {
- beingProcessed = false;
- }
+ if (log.isTraceEnabled()) {
+ log.trace("read into NW buffer : \n" +
+ Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
+ }
- public GenericIOHandler() {}
+ log.trace("attempt to read NW buffer into App buffer");
+ readFromNetworkBuffer();
- protected int readNetworkBuffer(int maxBytes) {
+ if (log.isTraceEnabled()) {
+ log.trace("read into App buffer : \n" +
+ Util.dumpAsHex(appReadBuffer.array(), appReadPos));
+ }
- try {
- // set position within buffer to read from channel
- nwReadBuffer.position(nwReadPos);
+ log.trace("attempt to process App buffer contents");
+ processAppReadBuffer();
+ }
+ }
- if (nwReadPos == nwReadBuffer.capacity()) {
- ByteBuffer newBuf = ByteBuffer.allocate(nwReadBuffer.capacity() * 2);
- log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
- nwReadBuffer.flip();
- nwReadBuffer = newBuf.put(nwReadBuffer);
- }
-
- // perform read from channel to this location
- // *** Read a maximum of maxBytes ***
- ByteBuffer temp = ByteBuffer.allocate(maxBytes);
- int bytesRead = socket.read(temp);
- //System.out.println("Read : " + maxBytes + " bytes\n" + Util.dumpAsHex(temp.array(), bytesRead));
- temp.flip();
- nwReadBuffer.put(temp);
-
- if (bytesRead != -1) {
- if (log.isDebugEnabled()) {
- log.debug("Read from socket to buffer position: " + nwReadPos + " to: " + nwReadBuffer.position());
- log.debug(Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
- }
+ /** acp
+ * Read from the physical NW until the NW read buffer is filled up
+ * @return the number of bytes actually read
+ */
+ protected int readFromPhysicalNetwork() {
- // save position for next read
- nwReadPos = nwReadBuffer.position();
- return bytesRead;
+ log.debug("attempting to read from the physical NW");
- } else {
- // end of stream reached
- log.info("end-of-stream detected and socket closed");
- socket.close();
- sk.cancel();
- }
+ // set position within buffer to read from channel
+ nwReadBuffer.position(nwReadPos);
+ int bytesRead = 0;
+ try {
+ bytesRead = socket.read(nwReadBuffer);
} catch (IOException e) {
- log.warn(e.getMessage() + " Closing socket: " + socket);
- try {
- socket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
+ handleException("Error reading into NW buffer from socket : " + e.getMessage(), e);
+ }
+
+ if (bytesRead >= 0) {
+ log.debug("read " + bytesRead + " bytes from the physical NW");
+ nwReadPos += bytesRead;
+ if (log.isTraceEnabled() && bytesRead > 0) {
+ log.trace("current nwReadBuffer\n" +
+ Util.dumpAsHex(nwReadBuffer.array(), bytesRead));
}
+ return bytesRead;
+
+ } else {
+ // end of stream reached
+ log.debug("end-of-stream detected and socket closed");
sk.cancel();
+ try {
+ socket.close();
+ } catch (IOException e) {}
+
}
return 0;
}
- protected void readApplicationBuffer() {
+ /** acp
+ * Read from the NW buffer into the App buffer, the complete NW buffer contents
+ */
+ protected void readFromNetworkBuffer() {
nwReadBuffer.position(nwReadPos);
nwReadBuffer.flip();
@@ -115,93 +128,95 @@
appReadBuffer.position(appReadPos);
while (appReadBuffer.remaining() < nwReadBuffer.limit()) {
ByteBuffer newBuf = ByteBuffer.allocate(appReadBuffer.capacity() * 2);
- log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+ log.debug("expanding appReadBuffer to " + newBuf.capacity() + " bytes");
appReadBuffer.flip();
appReadBuffer = newBuf.put(appReadBuffer);
}
appReadBuffer.put(nwReadBuffer);
appReadPos = appReadBuffer.position();
- appReadBuffer.flip();
-
+ appReadBuffer.flip();
+
nwReadBuffer.clear();
nwReadPos = 0;
}
-
- protected void writeNetworkBuffer() {
- nwWriteBuffer.position(nwWritePos);
- if (nwWritePos > 0) {
- nwWriteBuffer.compact();
- }
-
- appWriteBuffer.position(appWritePos);
- appWriteBuffer.flip();
-
- while (nwWriteBuffer.remaining() < appWriteBuffer.limit()) {
- ByteBuffer newBuf = ByteBuffer.allocate(nwWriteBuffer.capacity() * 2);
- log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
- nwWriteBuffer.flip();
- nwWriteBuffer = newBuf.put(nwWriteBuffer);
+
+ /** acp
+ * The Incoming and Outgoing Handlers should implement this method, and provide
+ * thier custom implementations to handle received http messages
+ */
+ abstract protected void processAppReadBuffer();
+
+ /** acp
+ * Process a ready write event
+ */
+ protected void processReadyWrite(boolean closeConnectionIfDone) {
+
+ log.debug("attempt to write to the App buffer from the message writer");
+ writeToApplicationBuffer();
+
+ log.debug("attempt to write to the NW buffer from the App buffer");
+ writeToNetworkBuffer();
+
+ log.debug("attempt to write to the physical NW from the NW buffer");
+ writeToPhysicalNetwork();
+
+ if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
+ // if both our App & NW buffers are empty, and body stream has ended
+ log.debug("message written completely to the wire");
+ if (closeConnectionIfDone && msgWriter.isConnectionClose()) {
+ log.debug("closing connection normally as connection-close requested");
+ sk.cancel();
+ try {
+ socket.close();
+ } catch (IOException e) {}
+ } else {
+ // response has been written completely
+ // now read response or result code
+ sk.interestOps(SelectionKey.OP_READ);
+ }
}
-
- nwWriteBuffer.put(appWriteBuffer);
- nwWritePos = nwWriteBuffer.position();
- appWritePos = 0;
-
- System.out.println("nwWriteBuffer : \n" + Util.dumpAsHex(nwWriteBuffer.array(), nwWritePos));
}
- protected void writeToNetwork() {
- // write as much as we can
- nwWriteBuffer.position(nwWritePos);
- nwWriteBuffer.flip();
- int total = 0;
- int write = 0;
- do {
- try {
- write = socket.write(nwWriteBuffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- if (write > 0) {
- total += write;
- }
- } while (write > 0);
-
- // compact the left overs
- nwWriteBuffer.compact();
- nwWritePos = nwWriteBuffer.position();
- }
+ /** acp
+ * Reads from the msgWriter and writes to the application buffer
+ * If the message header has not been read, the complete header is read (which
+ * is known not to block) and written at once, expanding the application buffer
+ * as required. If the header has been written, the msgWriter would be in the
+ * body-streaming mode, in which case, it is attempted to read until the application
+ * buffer is filled up - or the end of stream is reached
+ */
+ protected void writeToApplicationBuffer() {
- protected void writeApplicationBuffer() {
if (!msgWriter.isStreamingBody()) {
+ log.debug("attempting to write message header into App buffer");
byte[] header = msgWriter.getMessageHeader();
+ appWriteBuffer.position(appWritePos);
while (appWriteBuffer.remaining() < header.length) {
ByteBuffer newBuf = ByteBuffer.allocate(appWriteBuffer.capacity() * 2);
- log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+ log.debug("expanding App buffer to " + newBuf.capacity() + " bytes");
appWriteBuffer.flip();
appWriteBuffer = newBuf.put(appWriteBuffer);
}
appWriteBuffer.put(header);
appWritePos = appWriteBuffer.position();
- msgWriter.setStreamingBody(true);
+ msgWriter.setStreamingBody(true); // switch into body streaming mode
} else {
- // fill the appWriteBuffer from the message body
+ log.debug("attempting to write from message body into App buffer");
+
InputStream is = msgWriter.getInputStream();
appWriteBuffer.position(appWritePos);
- if (appWritePos > 0) {
- appWriteBuffer.compact();
- }
try {
while (appWriteBuffer.remaining() > 0) {
int c = is.read();
if (c == -1) {
msgWriter.setStreamingBody(false);
+ log.debug("end of message body stream detected");
break;
}
appWriteBuffer.put((byte) c);
@@ -209,11 +224,88 @@
appWritePos = appWriteBuffer.position();
- System.out.println("appWriteBuffer : \n" + Util.dumpAsHex(appWriteBuffer.array(), appWritePos));
+ } catch (IOException e) {
+ handleException("Error reading message body stream : " + e.getMessage(), e);
+ }
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("current appWriteBuffer : \n" +
+ Util.dumpAsHex(appWriteBuffer.array(), appWritePos));
+ }
+ }
+
+ /** acp
+ * Write all of the contents of the App buffer into the NW buffer, expanding
+ * it as required
+ * TODO handle SSL later
+ */
+ protected void writeToNetworkBuffer() {
+
+ appWriteBuffer.position(appWritePos);
+ appWriteBuffer.flip();
+
+ // expand NW buffer to hold App buffer contents
+ while (nwWriteBuffer.remaining() < appWriteBuffer.limit()) {
+ ByteBuffer newBuf = ByteBuffer.allocate(nwWriteBuffer.capacity() * 2);
+ log.debug("expanding NW buffer to " + newBuf.capacity() + " bytes");
+ nwWriteBuffer.flip();
+ nwWriteBuffer = newBuf.put(nwWriteBuffer);
+ }
+
+ nwWriteBuffer.position(nwWritePos);
+ nwWriteBuffer.put(appWriteBuffer);
+ nwWritePos = nwWriteBuffer.position();
+ // clear App buffer as we wrote all of it to the NW buffer
+ appWritePos = 0;
+ appWriteBuffer.clear();
+
+ if (log.isTraceEnabled()) {
+ log.trace("current nwWriteBuffer : \n" +
+ Util.dumpAsHex(nwWriteBuffer.array(), nwWritePos));
+ }
+ }
+
+ /** acp
+ * Write the contents of the NW buffer into the physical socket
+ */
+ protected void writeToPhysicalNetwork() {
+ // write as much as we can from our NW buffer, without blocking of course!
+ nwWriteBuffer.position(nwWritePos);
+ nwWriteBuffer.flip();
+ int write = 0;
+ do {
+ try {
+ write = socket.write(nwWriteBuffer);
} catch (IOException e) {
- e.printStackTrace();
+ handleException("Error writing to socket : " + e.getMessage(), e);
}
+ } while (write > 0 && nwWriteBuffer.remaining() > 0);
+
+ // compact any left overs and capture new position
+ nwWriteBuffer.compact();
+ nwWritePos = nwWriteBuffer.position();
+ }
+
+ /**
+ * Handle an exception encountered. TODO need to streamline error handling and cleanup
+ * @param msg a message indicating the exception
+ * @param e the Exception thrown
+ */
+ protected void handleException(String msg, Exception e) {
+ log.error(msg, e);
+
+ if (sk.isValid()) {
+ sk.cancel();
}
+ try {
+ if (socket != null && socket.isOpen()) {
+ socket.close();
+ }
+ } catch (IOException e1) {}
+
+ //TODO throw new ();
}
+
}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IOHandler.java Wed Oct 25 10:50:44 2006
@@ -20,86 +20,13 @@
import java.io.IOException;
import java.util.Date;
-public class IOHandler {
+public interface IOHandler extends Runnable {
- class Message {
- Message(String msg) {
- write(msg.getBytes());
- }
+ public boolean isBeingProcessed();
- private ByteBuffer buffer = ByteBuffer.allocate(1024);
+ public void lock();
- public InputStream getInputStream() {
- buffer.position(0);
- return new InputStream() {
- public synchronized int read() throws IOException {
- if (!buffer.hasRemaining()) {
- return -1;
- }
- return buffer.get();
- }
- public synchronized int read(byte[] bytes, int off, int len) throws IOException {
- len = Math.min(len, buffer.remaining());
- buffer.get(bytes, off, len);
- return len;
- }
- };
- }
+ public void unlock();
- private int writePos;
-
- public void write(byte[] data) {
- buffer.position(writePos);
- buffer.put(data);
- writePos = buffer.position();
- }
- }
-
- class B implements Runnable {
- Message m;
- B(Message m) {
- this.m = m;
- }
- public void run() {
- try {
- while (true) {
- Thread.sleep(1000);
- m.write(("Message : " + new Date()).getBytes());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- class C implements Runnable {
- InputStream is = null;
- byte[] data = new byte[1024];
- C(InputStream is) {
- this.is = is;
- }
- public void run() {
- try {
- while (true) {
- Thread.sleep(1000);
- int r = is.read(data);
- if (r > 0) {
- System.out.println("data : \n" + Util.dumpAsHex(data, r));
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- public void test() {
- Message m = new Message("Hello World");
- new Thread(new B(m)).start();
- new Thread(new C(m.getInputStream())).start();
- }
-
- public static void main(String[] args) {
- new IOHandler().test();
- }
+ public void run();
}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java Wed Oct 25 10:50:44 2006
@@ -38,15 +38,13 @@
private static final Log log = LogFactory.getLog(IncomingHandler.class);
- private MessageReader msgReader = new MessageReader(true);
-
public IncomingHandler(SocketChannel socket, Selector selector,
HttpService httpService) throws IOException {
+ this.msgReader = new MessageReader(true); // set up in 'request' parse mode
this.httpService = httpService;
this.socket = socket;
- socket.configureBlocking(false);
- // Optionally try first read now
+ this.socket.configureBlocking(false);
sk = socket.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ); // we are only interested to read
@@ -54,87 +52,43 @@
}
public void setResponse(HttpResponse response) {
- // TODO writeHandler.setMessage(response.getWireBuffer(), response.isConnectionClose());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
- log.debug("\tIncomingHandler.setResponse()");
}
- /**
- * The main handler routine
+ /** acp
+ * The main handler routine for incoming requests and responses
*/
public void run() {
- if (sk.isReadable()) {
- log.debug("\tIncomingHandler run() - READABLE");
-
- if (msgReader.availableForWrite() == 0) {
- //System.out.println("Reject read");
- return;
- } else {
- //System.out.println("Accept read");
- }
-
- if (readNetworkBuffer(msgReader.availableForWrite()) > 0) {
- //System.out.println("NW Buffer read : \n" + Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
- readApplicationBuffer();
-
- //System.out.println(Thread.currentThread().getName() + " Processing App Buffer : \n" + Util.dumpAsHex(appReadBuffer.array(), appReadPos));
- processAppReadBuffer();
- }
-
- } else if (sk.isWritable()) {
- log.debug("\tIncomingHandler run() - WRITEABLE");
-
- log.debug("\tIncomingHandler run() - WRITEABLE");
-
- writeApplicationBuffer();
- writeNetworkBuffer();
- writeToNetwork();
-
- if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
- log.debug("\tRequest written completely");
- if (msgWriter.isConnectionClose()) {
- log.debug("\tClosing connection normally");
- sk.cancel();
- try {
- socket.close();
- } catch (IOException e) {
- log.warn("Error during socket close : " + e.getMessage(), e);
- }
- } else {
- // response has been written completely
- // now read response or at least result code
- sk.interestOps(SelectionKey.OP_READ);
- }
+ try {
+ if (sk.isReadable()) {
+ log.debug("readable");
+ processReadyRead();
+
+ } else if (sk.isWritable()) {
+ log.debug("writable");
+ processReadyWrite(true);
}
- /*if (writeHandler.handle(socket)) {
- log.debug("\tThe response has been written completely");
- // response has been written completely
- if (writeHandler.isConnectionClose()) {
- log.debug("\tClosing connection normally");
- sk.cancel();
- try {
- socket.close();
- } catch (IOException e) {
- log.warn("Error during socket close : " + e.getMessage(), e);
- }
- } else {
- // now we are again interested to read
- sk.interestOps(SelectionKey.OP_READ);
- }
- }*/
- } else {
- log.warn("IncomingHandler run(!!!unknown event!!!) : " + sk.readyOps());
+ } finally{
+ if (isBeingProcessed())
+ unlock();
}
}
- private void processAppReadBuffer() {
+ /** acp
+ * Process the App buffer which has been read. If we just read the message
+ * header right now, then this method fires the handleRequest() call on the
+ * http service passing. However, at this point, the body may not have been
+ * read completely.
+ */
+ protected void processAppReadBuffer() {
boolean readHeader = msgReader.isStreamingBody();
try {
int pos = msgReader.process(appReadBuffer);
+
// if the handler digested any bytes, discard and compact the buffer
if (pos > 0) {
appReadBuffer.position(pos);
@@ -146,18 +100,14 @@
if (!readHeader && msgReader.isStreamingBody()) {
HttpRequest request = (HttpRequest) msgReader.getHttpMessage();
request.setHandler(this);
- log.debug("\tFire event for received HttpRequest");
+ log.debug("fire event for received HttpResponse");
httpService.handleRequest(request);
}
- /*socket.close();
- sk.cancel();
- log.debug("Socket closed and SelectionKey cancelled");*/
-
} catch (IOException e) {
- e.printStackTrace();
+ handleException("Error piping the received App buffer : " + e. getMessage(), e);
} catch (NHttpException e) {
- e.printStackTrace();
+ handleException(e.getMessage(), e);
}
}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/MessageReader.java Wed Oct 25 10:50:44 2006
@@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.io.IOException;
import java.io.InputStream;
+import java.util.StringTokenizer;
/**
* TODO
@@ -156,11 +157,19 @@
// this effectively skips any blank lines
messageLine = getNextLine(buffer);
}
- String[] parts = messageLine.split("\\s");
- if (parts.length != 3) {
- throw new NHttpException("Invalid " +
- (requestMode ? "request" : "response") + " line : " + messageLine);
+
+ String[] parts = new String[3];
+ for (int i=0; i<2; i++) {
+ int sep = messageLine.indexOf(' ');
+ if (sep == -1) {
+ throw new NHttpException("Invalid " +
+ (requestMode ? "request" : "response") + " line : " + messageLine);
+ } else {
+ parts[i] = messageLine.substring(0, sep);
+ messageLine = messageLine.substring(sep+1);
+ }
}
+ parts[2] = messageLine;
if (requestMode) {
HttpRequest req = (HttpRequest) httpMessage;
@@ -295,6 +304,14 @@
*/
public boolean isStreamingBody() {
return streamingBody;
+ }
+
+ /**
+ * Should we close the connection?
+ * @return true if we should close the connection
+ */
+ public boolean isConnectionClose() {
+ return httpMessage.isConnectionClose();
}
/**
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java Wed Oct 25 10:50:44 2006
@@ -19,32 +19,37 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.ClosedChannelException;
/**
- * This handler owns sending of a httpMessage to an external endpoint using a WriteHandler
- * and reading back the response. This does not handle persistent/pipelined connections
+ * This handler owns the asynchronous sending of a httpMessage to an external endpoint
+ * and reading back the response. This does not handle persistent/pipelined connections yet
*/
-public class OutgoingHandler extends GenericIOHandler implements Runnable {
+public class OutgoingHandler extends GenericIOHandler {
private static final Log log = LogFactory.getLog(OutgoingHandler.class);
+ /** A runnable callback object that would be invoked once a reply to the message is received */
private Runnable callback = null;
- private MessageReader msgReader = new MessageReader(false);
- OutgoingHandler(SocketChannel socket, SelectionKey sk, HttpRequest request, HttpService httpService) {
+ OutgoingHandler(SocketChannel socket, Selector selector, HttpService httpService)
+ throws ClosedChannelException {
this.httpService = httpService;
this.socket = socket;
- this.sk = sk;
- request.getWireBuffer().position(0);
+ this.msgReader = new MessageReader(false);
+ this.sk = socket.register(selector, SelectionKey.OP_CONNECT);
+ sk.attach(this);
+ }
+
+ public void setRequest(HttpRequest request) {
if (!request.isChunked()) {
- request.addHeader(Constants.CONTENT_LENGTH, Integer.toString(request.getBuffer().limit()));
+ request.addHeader(Constants.CONTENT_LENGTH,
+ Integer.toString(request.getBuffer().limit()));
}
-
msgWriter = new MessageWriter(true, request);
- //writeHandler.setMessage(request.getWireBuffer(), true /* connection close */);
}
public Runnable getCallback() {
@@ -55,57 +60,49 @@
this.callback = callback;
}
+ /** acp
+ * The main handler routing for outgoing messages and responses
+ */
public void run() {
try {
if (sk.isConnectable() && socket.finishConnect()) {
- log.debug("\tIncomingHandler run() - CONNECTABLE and CONNECTED");
+ log.debug("socket was connectable and now connected");
sk.interestOps(SelectionKey.OP_WRITE);
} else if (sk.isWritable()) {
- log.debug("\tIncomingHandler run() - WRITEABLE");
-
- writeApplicationBuffer();
- writeNetworkBuffer();
- writeToNetwork();
-
- if (nwWritePos == 0 && appWritePos == 0 && !msgWriter.isStreamingBody()) {
- log.debug("\tRequest written completely");
- // response has been written completely
- // now read response or at least result code
- sk.interestOps(SelectionKey.OP_READ);
- }
+ log.debug("writable");
+ processReadyWrite(false);
} else if (sk.isReadable()) {
- log.debug("\tOutgoingHandler run() - READABLE");
-
- if (msgReader.availableForWrite() == 0) {
- return; // reject read
- }
-
- if (readNetworkBuffer(msgReader.availableForWrite()) > 0) {
- //System.out.println("NW Buffer read : \n" + Util.dumpAsHex(nwReadBuffer.array(), nwReadPos));
- readApplicationBuffer();
-
- //System.out.println(Thread.currentThread().getName() + " Processing App Buffer : \n" + Util.dumpAsHex(appReadBuffer.array(), appReadPos));
- processAppReadBuffer();
- }
+ log.debug("readable");
+ processReadyRead();
}
- }
- catch (IOException e) {
- log.error("Error in OutGoingHandler : " + e.getMessage(), e);
+
+ } catch (IOException e) {
+ handleException("Error connecting to socket : " + e.getMessage(), e);
} finally{
if (isBeingProcessed())
unlock();
}
}
- private void processAppReadBuffer() {
+ /** acp
+ * Process the App buffer which has been read. If we just read the message
+ * header right now, then this method fires the handleResponse() callback
+ * to the http service passing the registered callback object as well. However,
+ * at this point, the body may not have been read completely.
+ */
+ protected void processAppReadBuffer() {
+
boolean readHeader = msgReader.isStreamingBody();
try {
+ // let MessageReader digest what it can. The returned position is where
+ // it digested the buffer, which we can then discard
int pos = msgReader.process(appReadBuffer);
- // if the handler digested any bytes, discard and compact the buffer
+
if (pos > 0) {
+ // if the handler digested any bytes, discard and compact the buffer
appReadBuffer.position(pos);
appReadBuffer.compact();
appReadPos = appReadBuffer.position();
@@ -113,19 +110,27 @@
// if we hadn't read the full header earlier, and read it just now
if (!readHeader && msgReader.isStreamingBody()) {
- log.debug("\tFire event for received HttpResponse");
+ log.debug("fire event for received HttpResponse");
unlock();
httpService.handleResponse((HttpResponse) msgReader.getHttpMessage(), callback);
}
- /*socket.close();
- sk.cancel();
- log.debug("Socket closed and SelectionKey cancelled");*/
+ // if we had read the header before, and finished receiving the streamed
+ // body just now, we want to close the stream and cancel the SK if
+ // a connection close was mentioned
+ if (readHeader && !msgReader.isStreamingBody() && msgReader.isConnectionClose()) {
+ log.debug("closing the connection as the body has been received completely" +
+ " and the received response indicated a connection close");
+ try {
+ socket.close();
+ } catch (IOException e) {}
+ sk.cancel();
+ }
} catch (IOException e) {
- e.printStackTrace();
+ handleException("Error piping the received App buffer : " + e. getMessage(), e);
} catch (NHttpException e) {
- e.printStackTrace();
+ handleException(e.getMessage(), e);
}
}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java Wed Oct 25 10:50:44 2006
@@ -54,7 +54,7 @@
/**
* The maximum number of threads used for the worker thread pool
*/
- private static final int WORKERS_MAX_THREADS = 100;
+ private static final int WORKERS_MAX_THREADS = 4;
/**
* The keep alive time of an idle worker thread
*/
@@ -142,7 +142,7 @@
// create thread pool of workers
workerPool = new ThreadPoolExecutor(
- 10,
+ 4,
WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
new LinkedBlockingQueue(),
new org.apache.axis2.util.threadpool.DefaultThreadFactory(
@@ -181,25 +181,21 @@
*/
void dispatch(SelectionKey k) {
Runnable r = (Runnable) k.attachment();
- if (r != null && r instanceof GenericIOHandler) {
- GenericIOHandler h = (GenericIOHandler) r;
+ if (r != null && r instanceof IOHandler) {
+ IOHandler h = (IOHandler) r;
if (!h.isBeingProcessed()) {
h.lock();
//r.run();
workerPool.execute(r);
//lfPool.submit(r);
}
- /*} else {
- if (r != null) {
- workerPool.execute(r);
- }*/
}
}
/**
* Accepts a new connection and hands it off to a new IncomingHandler instance
*/
- class Acceptor extends GenericIOHandler implements Runnable {
+ class Acceptor extends AbstractIOHandler {
private HttpService httpService = null;
public Acceptor(HttpService httpService) {
@@ -221,33 +217,52 @@
}
}
+ /**
+ * Queue this HttpRequest to be sent to its destination, and invoke the Callback
+ * on recepit of a response
+ * @param request the HttpRequest which is to be sent. The body of this message could
+ * be streamed before or after the invocation of this method, and the end of the
+ * body would be determined when the output stream is closed.
+ * @param callback an optional call back to be invoked when a response is received
+ * for this request
+ */
public void send(HttpRequest request, Runnable callback) {
+ SocketChannel socket = null;
try {
InetSocketAddress addr = new InetSocketAddress(
request.getHost(), request.getPort());
- SocketChannel socket = SocketChannel.open();
+ socket = SocketChannel.open();
socket.configureBlocking(false);
socket.connect(addr);
- SelectionKey sk = socket.register(selector, SelectionKey.OP_CONNECT);
- OutgoingHandler outHandler = new OutgoingHandler(socket, sk, request, httpService);
- if (callback != null) {
- outHandler.setCallback(callback);
- }
- sk.attach(outHandler);
+ OutgoingHandler outHandler = new OutgoingHandler(socket, selector, httpService);
+
+ outHandler.setRequest(request);
+ outHandler.setCallback(callback);
} catch (IOException e) {
- e.printStackTrace();
+ // close the socket if we opened it, selection key would be cancelled if invoked
+ if (socket != null && socket.isOpen()) {
+ try {
+ socket.close();
+ } catch (IOException ioe) {}
+ }
+ handleException("IO Exception : " + e.getMessage() +
+ " sending request : " + request + " : " , e);
}
}
private static void handleException(String msg, Exception e) {
+ // todo decide how to handle exceptions and cleanup resources etc
log.error(msg, e);
- e.printStackTrace(); // TODO
- // throw new xxxx TODO
+ e.printStackTrace();
}
+ /**
+ * Request to shutdown the reactor
+ * @param shutdownRequested if true, will request the reactor to shutdown
+ */
public void setShutdownRequested(boolean shutdownRequested) {
this.shutdownRequested = shutdownRequested;
}
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=diff&rev=467721&r1=467720&r2=467721
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java (original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java Wed Oct 25 10:50:44 2006
@@ -81,7 +81,7 @@
}
});
new Thread(r).start();
- r.send(request, null);
+ //r.send(request, null);
byte[] bodyBytes = body.getBytes();
int incr = 32;
@@ -91,6 +91,8 @@
}
os.flush();
os.close();
+
+ r.send(request, null);
}
private void simpleGet() throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org