You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2006/10/11 20:02:10 UTC

svn commit: r462889 [1/2] - in /incubator/synapse/trunk/java/modules/niohttp: ./ src/ src/org/ src/org/apache/ src/org/apache/axis2/ src/org/apache/axis2/transport/ src/org/apache/axis2/transport/niohttp/ src/org/apache/axis2/transport/niohttp/impl/

Author: asankha
Date: Wed Oct 11 11:02:08 2006
New Revision: 462889

URL: http://svn.apache.org/viewvc?view=rev&rev=462889
Log:
adding the first cut of the non-blocking http transport
needs clean up and lots of refactoring - see ReactorTester.java for a hint for how this will be used in synapse
checking in for review and comments - and is not included into the build yet

Added:
    incubator/synapse/trunk/java/modules/niohttp/
    incubator/synapse/trunk/java/modules/niohttp/src/
    incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties
    incubator/synapse/trunk/java/modules/niohttp/src/org/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java
    incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java

Added: incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties Wed Oct 11 11:02:08 2006
@@ -0,0 +1,25 @@
+log4j.rootCategory=DEBUG,logfile,stdout
+
+# Set the level to DEBUG if you want to log all SlideExceptions (some of them aren't errors)
+#log4j.category.org.apache.axis2=DEBUG
+log4j.category.org.apache.synapse=DEBUG
+log4j.category.org.apache.axis2.transport.niohttp=DEBUG
+
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %C{1} - %m %n
+
+####  appender writes to a file
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+#log4j.appender.logfile.File=logs/synapse.log
+
+
+# Control the maximum log file size
+log4j.appender.logfile.MaxFileSize=1000KB
+# Archive log files (one backup file here)
+log4j.appender.logfile.MaxBackupIndex=10
+
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%6r [%t] %5p %C{1} (%F:%L) - %m%n
\ No newline at end of file

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,62 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.transport.niohttp.impl.HttpRequest;
+import org.apache.axis2.transport.niohttp.impl.HttpResponse;
+import org.apache.axis2.transport.niohttp.impl.Callback;
+
+public class Axis2CallbackImpl implements Callback {
+
+    private HttpRequest request;
+    private HttpResponse response;
+    private MessageContext msgCtx;
+
+    public Axis2CallbackImpl(HttpRequest request, MessageContext msgCtx) {
+        this.request = request;
+        this.msgCtx = msgCtx;
+    }
+
+    public HttpRequest getRequest() {
+        return request;
+    }
+
+    public void setRequest(HttpRequest request) {
+        this.request = request;
+    }
+
+    public HttpResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(HttpResponse response) {
+        this.response = response;
+    }
+
+    public MessageContext getMsgCtx() {
+        return msgCtx;
+    }
+
+    public void setMsgCtx(MessageContext msgCtx) {
+        this.msgCtx = msgCtx;
+    }
+
+    public void run() {
+        System.out.println("Reponse received for HttpRequest : " + request +
+            "\nAxis message  :" + msgCtx.getEnvelope());
+    }
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,201 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.ListenerManager;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.util.UUIDGenerator;
+import org.apache.axis2.util.threadpool.DefaultThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+public class NHttpListener implements TransportListener, HttpService {
+
+    private static final Log log = LogFactory.getLog(NHttpListener.class);
+
+    private static final int WORKERS_MAX_THREADS = 40;
+    private static final long WORKER_KEEP_ALIVE = 100L;
+    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+
+    private ConfigurationContext cfgCtx;
+    private String serviceEPRPrefix;
+    private Reactor _reactor;
+    private Executor workerPool = null;
+
+    public NHttpListener() {}
+    
+    public NHttpListener(ConfigurationContext cfgCtx, int port) throws AxisFault {
+
+        this.cfgCtx = cfgCtx;
+        TransportInDescription httpDescription = new TransportInDescription(
+            new QName(org.apache.axis2.Constants.TRANSPORT_HTTP));
+        httpDescription.setReceiver(this);
+
+        ListenerManager listenerManager = cfgCtx.getListenerManager();
+        if (listenerManager == null) {
+            listenerManager = new ListenerManager();
+            listenerManager.init(cfgCtx);
+        }
+        cfgCtx.getListenerManager().addListener(httpDescription, true);
+    }
+
+    public void init(ConfigurationContext cfgCtx, TransportInDescription transprtIn)
+        throws AxisFault {
+
+        this.cfgCtx = cfgCtx;
+        try {
+            int port = 8080;
+            String host;
+            Parameter param = transprtIn.getParameter(PARAM_PORT);
+            if (param != null)
+                port = Integer.parseInt((String) param.getValue());
+
+            param = transprtIn.getParameter(HOST_ADDRESS);
+            if (param != null)
+                host = ((String) param.getValue()).trim();
+            else
+                host = java.net.InetAddress.getLocalHost().getHostName();
+
+            serviceEPRPrefix = "http://" + host + (port == 80 ? "" : ":" + port) +
+                cfgCtx.getServiceContextPath() + "/";
+
+            _reactor = Reactor.createReactor(host, port, false, this);
+
+            workerPool = new ThreadPoolExecutor(
+            1,
+            WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
+            new LinkedBlockingQueue(),
+            new DefaultThreadFactory(
+                    new ThreadGroup("HTTP Worker thread group"),
+                    "HTTPWorker"));
+
+        } catch (Exception e) {
+            throw new AxisFault(e);
+        }
+    }
+
+    public void start() throws AxisFault {
+        log.debug("Starting IO Reactor...");
+        new Thread(_reactor).start();
+        log.info("IO Reactor started, accepting connections...");
+    }
+
+    public void stop() throws AxisFault {
+        _reactor.setShutdownRequested(true);
+        log.info("IO Reactor shut down");
+    }
+
+    public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
+        return new EndpointReference(serviceEPRPrefix + serviceName);
+    }
+
+    //TODO This should handle other endpoints too. Ex: If a rest call has made
+    public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+        EndpointReference[] endpointReferences = new EndpointReference[1];
+        endpointReferences[0] = new EndpointReference(serviceEPRPrefix + serviceName);
+        return endpointReferences;
+    }
+
+    public void handleRequest(HttpRequest request) {
+
+        log.debug("@@@@ Got new HTTP request : " + request.toStringLine());
+
+        MessageContext msgContext = new MessageContext();
+        msgContext.setIncomingTransportName(Constants.TRANSPORT_HTTP);
+        try {
+            TransportOutDescription transportOut = cfgCtx.getAxisConfiguration()
+                .getTransportOut(new QName(Constants.TRANSPORT_HTTP));
+            TransportInDescription transportIn = cfgCtx.getAxisConfiguration()
+                .getTransportIn(new QName(Constants.TRANSPORT_HTTP));
+
+            msgContext.setConfigurationContext(cfgCtx);
+
+            /* TODO session handling
+            String sessionKey = request.getSession(true).getId();
+            if (cfgCtx.getAxisConfiguration().isManageTransportSession()) {
+                SessionContext sessionContext = sessionManager.getSessionContext(sessionKey);
+                msgContext.setSessionContext(sessionContext);
+            }*/
+
+            msgContext.setTransportIn(transportIn);
+            msgContext.setTransportOut(transportOut);
+            msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+            msgContext.setServerSide(true);
+            msgContext.setProperty(Constants.Configuration.TRANSPORT_IN_URL, request.getPath());
+
+            msgContext.setProperty(MessageContext.TRANSPORT_HEADERS, request.getHeaders());
+            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, request);
+
+            workerPool.execute(new Worker(cfgCtx, msgContext, request));
+
+        } catch (AxisFault e) {
+            HttpResponse response = request.createResponse();
+
+            try {
+                AxisEngine engine = new AxisEngine(cfgCtx);
+                msgContext.setProperty(MessageContext.TRANSPORT_OUT, response.getOutputStream());
+                msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, response.getOutputStream());
+
+                MessageContext faultContext = engine.createFaultMessageContext(msgContext, e);
+                engine.sendFault(faultContext);
+
+                response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+
+            } catch (Exception ex) {
+                response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+                response.addHeader(org.apache.axis2.transport.niohttp.impl.Constants.CONTENT_TYPE,
+                    org.apache.axis2.transport.niohttp.impl.Constants.TEXT_PLAIN);
+                OutputStreamWriter out = new OutputStreamWriter(
+                    response.getOutputStream());
+                try {
+                    out.write(ex.getMessage());
+                    out.close();
+                } catch (IOException ee) {
+                }
+            }
+            response.commit();
+            return;
+        }
+    }
+
+    public void handleResponse(HttpResponse response, Runnable callback) {
+
+        log.debug("@@@@ Got new HTTP response : " + response.toStringLine());
+        //callback.setResponse(response);
+        callback.run();
+    }
+
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,246 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+
+import javax.xml.stream.XMLStreamException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.MalformedURLException;
+
+public class NHttpSender extends AbstractHandler implements TransportSender {
+
+    public void invoke(MessageContext msgContext) throws AxisFault {
+
+        OMOutputFormat format = getOMOutputFormat(msgContext);
+
+        // Trasnport URL maybe different from the WSA-To
+        EndpointReference epr = null;
+        String transportURL = (String)
+            msgContext.getProperty(Constants.Configuration.TRANSPORT_URL);
+
+        if (transportURL != null) {
+            epr = new EndpointReference(transportURL);
+        } else if
+            ((msgContext.getTo() != null) &&
+                !AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(
+                    msgContext.getTo().getAddress()) &&
+                !AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(
+                    msgContext.getTo().getAddress())) {
+            epr = msgContext.getTo();
+        }
+
+        // select the OMElement of the message to be sent - checking if using REST
+        OMElement dataOut = null;
+        if (msgContext.isDoingREST()) {
+            dataOut = msgContext.getEnvelope().getBody().getFirstElement();
+        } else {
+            dataOut = msgContext.getEnvelope();
+        }
+
+        if (epr != null) {
+            // this is a new message being sent to the above EPR
+            if (!epr.getAddress().equals(AddressingConstants.Final.WSA_NONE_URI)) {
+                Reactor reactor = null;
+                if (epr.getAddress().startsWith("http")) {
+                    reactor = Reactor.getInstance(false);
+                } else if (epr.getAddress().startsWith("https")) {
+                    reactor = Reactor.getInstance(true);
+                }
+
+                if (reactor != null) {
+                    HttpRequest req = null;
+                    try {
+                        req = new HttpRequest(new URL(epr.getAddress()));
+                    } catch (MalformedURLException e) {
+                        e.printStackTrace();
+                        // TODO
+                    }
+                    populateHttpMessage(req, msgContext, format, dataOut);
+
+                    Axis2CallbackImpl cb = new Axis2CallbackImpl(req, msgContext);
+                    reactor.send(req, cb);
+
+                } else {
+                    // TODO
+                }
+
+                //new CommonsHTTPTransportSender().writeMessageWithCommons(
+                //    msgContext, epr, dataOut, format);
+
+            } else {
+                // TODO handle
+            }
+        } else {
+            // this is a response being sent to a request already received
+            if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) != null) {
+                if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) 
+                    instanceof HttpRequest) {
+                    sendAsyncResponse(msgContext, format, dataOut);
+                } else {
+                    sendUsingOutputStream(msgContext, format, dataOut);
+                }
+            } else {
+                throw new AxisFault("Both message 'TO' and Property MessageContext.TRANSPORT_OUT" +
+                    " is Null, Do not know where to send");
+            }
+        }
+
+        if (msgContext.getOperationContext() != null) {
+            msgContext.getOperationContext().setProperty(
+                Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+        }
+    }
+
+    private OMOutputFormat getOMOutputFormat(MessageContext msgContext) {
+        OMOutputFormat format = new OMOutputFormat();
+        String charSetEnc = (String) msgContext.getProperty(
+            Constants.Configuration.CHARACTER_SET_ENCODING);
+
+        if (charSetEnc != null) {
+            format.setCharSetEncoding(charSetEnc);
+        } else {
+            OperationContext opctx = msgContext.getOperationContext();
+            if (opctx != null) {
+                charSetEnc = (String) opctx.getProperty(
+                    Constants.Configuration.CHARACTER_SET_ENCODING);
+            }
+        }
+
+        // if the char set enc is still not found use the default
+        if (charSetEnc == null) {
+            charSetEnc = MessageContext.DEFAULT_CHAR_SET_ENCODING;
+        }
+
+        msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext));
+        msgContext.setDoingSwA(HTTPTransportUtils.doWriteSwA(msgContext));
+        msgContext.setDoingREST(HTTPTransportUtils.isDoingREST(msgContext));
+
+        format.setSOAP11(msgContext.isSOAP11());
+        format.setDoOptimize(msgContext.isDoingMTOM());
+        format.setDoingSWA(msgContext.isDoingSwA());
+        format.setCharSetEncoding(charSetEnc);
+        return format;
+    }
+
+    private void sendAsyncResponse(MessageContext msgContext, OMOutputFormat format, OMElement dataOut) throws AxisFault {
+
+        HttpRequest request = (HttpRequest) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
+        HttpResponse response = request.createResponse();
+        response.setStatus(ResponseStatus.OK);
+        populateHttpMessage(response, msgContext, format, dataOut);
+        response.commit();
+    }
+
+    private void populateHttpMessage(HttpMessage message, MessageContext msgContext, OMOutputFormat format, OMElement dataOut) throws AxisFault {
+
+        String contentType;
+        Object contentTypeObject = msgContext.getProperty(Constants.Configuration.CONTENT_TYPE);
+        if (contentTypeObject != null) {
+            contentType = (String) contentTypeObject;
+        } else if (msgContext.isDoingREST()) {
+            contentType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
+        } else {
+            contentType = format.getContentType();
+            format.setSOAP11(msgContext.isSOAP11());
+        }
+
+        message.addHeader(org.apache.axis2.transport.niohttp.impl.Constants.CONTENT_TYPE,
+            contentType + "; charset=" + format.getCharSetEncoding());
+
+        OutputStream out = message.getOutputStream();
+        format.setDoOptimize(msgContext.isDoingMTOM());
+        try {
+            dataOut.serializeAndConsume(out, format);
+            out.close();
+        } catch (Exception e) {
+            throw new AxisFault(e);
+        }        
+    }
+
+    private void sendUsingOutputStream(MessageContext msgContext,
+        OMOutputFormat format,
+        OMElement dataOut) throws AxisFault {
+        OutputStream out =
+            (OutputStream) msgContext
+                .getProperty(MessageContext.TRANSPORT_OUT);
+
+        if (msgContext.isServerSide()) {
+            OutTransportInfo transportInfo =
+                (OutTransportInfo) msgContext
+                    .getProperty(Constants.OUT_TRANSPORT_INFO);
+
+            if (transportInfo != null) {
+                String contentType;
+
+                Object contentTypeObject = msgContext.getProperty(Constants.Configuration.CONTENT_TYPE);
+                if (contentTypeObject != null) {
+                    contentType = (String) contentTypeObject;
+                } else if (msgContext.isDoingREST()) {
+                    contentType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
+                } else {
+                    contentType = format.getContentType();
+                    format.setSOAP11(msgContext.isSOAP11());
+                }
+
+
+                String encoding = contentType + "; charset="
+                    + format.getCharSetEncoding();
+
+                transportInfo.setContentType(encoding);
+            } else {
+                throw new AxisFault(Constants.OUT_TRANSPORT_INFO +
+                    " has not been set");
+            }
+        }
+
+        format.setDoOptimize(msgContext.isDoingMTOM());
+        try {
+            dataOut.serializeAndConsume(out, format);
+        } catch (XMLStreamException e) {
+            throw new AxisFault(e);
+        }
+    }
+
+    public void cleanup(MessageContext msgContext) throws AxisFault {
+        // do nothing
+    }
+
+    public void init(ConfigurationContext confContext, TransportOutDescription transportOut) throws AxisFault {
+        // do nothing
+    }
+
+    public void stop() {
+        // do nothing
+    }
+
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,271 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.transport.http.HTTPTransportReceiver;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.ws.commons.schema.XmlSchema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.net.SocketException;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+public class Worker implements Runnable {
+
+    private static final Log log = LogFactory.getLog(Worker.class);
+
+    private MessageContext msgContext = null;
+    private ConfigurationContext cfgCtx = null;
+    private HttpRequest request = null;
+    private String contextPath = null;
+    private String servicePath = null;
+    private static final String SOAPACTION = "SOAPAction";
+
+    Worker(ConfigurationContext cfgCtx, MessageContext msgContext, HttpRequest request) {
+        this.cfgCtx = cfgCtx;
+        this.msgContext = msgContext;
+        this.request = request;
+        contextPath = "/" + cfgCtx.getContextRoot() + "/";
+        servicePath = cfgCtx.getServiceContextPath();
+    }
+
+    public void run() {
+
+        HttpResponse response = request.createResponse();
+
+        if (Constants.GET.equals(request.getMethod())) {
+            processGet(response);
+
+        } else if (Constants.POST.equals(request.getMethod())) {
+            processPost(response);
+
+        } else {
+            handleException("Unsupported method : " + request.getMethod(), null, response);
+        }
+
+        //response.commit();
+    }
+
+    private void processGet(HttpResponse response) {
+
+        String uri = request.getPath();
+        String serviceName = uri.substring(uri.lastIndexOf("/") + 1);
+        int qPos = serviceName.indexOf("?");
+        if (qPos != -1) {
+            serviceName = serviceName.substring(0, qPos);
+        }
+
+        Map parameters = new HashMap();
+        Iterator iter = request.getParameterNames();
+        while (iter.hasNext()) {
+            String name = (String) iter.next();
+            parameters.put(name, request.getParameter(name));
+        }
+
+        if (uri.equals("/favicon.ico")) {
+            response.setStatus(ResponseStatus.MOVED_PERMANENTLY, "Redirect");
+            response.addHeader(Constants.LOCATION, "http://ws.apache.org/favicon.ico");
+
+        } else if (!uri.startsWith(contextPath)) {
+            response.setStatus(ResponseStatus.MOVED_PERMANENTLY, "Redirect");
+            response.addHeader(Constants.LOCATION, contextPath);
+
+        } else if (parameters.containsKey("wsdl")) {
+            AxisService service = (AxisService) cfgCtx.getAxisConfiguration().
+                getServices().get(serviceName);
+            if (service != null) {
+                try {
+                    service.printWSDL(response.getOutputStream(),
+                        Util.getIpAddress(), servicePath);
+                    response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                    response.setStatus(ResponseStatus.OK);
+
+                } catch (AxisFault e) {
+                    handleException("Axis2 fault writing ?wsdl output", e, response);
+                    return;
+                } catch (SocketException e) {
+                    handleException("Error getting ip address for ?wsdl output", e, response);
+                    return;
+                }
+            }
+
+        } else if (parameters.containsKey("wsdl2")) {
+            AxisService service = (AxisService) cfgCtx.getAxisConfiguration().
+                getServices().get(serviceName);
+            if (service != null) {
+                try {
+                    service.printWSDL2(response.getOutputStream(),
+                        Util.getIpAddress(), servicePath);
+                    response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                    response.setStatus(ResponseStatus.OK);
+
+                } catch (AxisFault e) {
+                    handleException("Axis2 fault writing ?wsdl2 output", e, response);
+                    return;
+                } catch (SocketException e) {
+                    handleException("Error getting ip address for ?wsdl2 output", e, response);
+                    return;
+                }
+            }
+
+        } else if (parameters.containsKey("xsd")) {
+            if (parameters.get("xsd") == null || "".equals(parameters.get("xsd"))) {
+                AxisService service = (AxisService) cfgCtx.getAxisConfiguration()
+                    .getServices().get(serviceName);
+                if (service != null) {
+                    try {
+                        service.printSchema(response.getOutputStream());
+                        response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                        response.setStatus(ResponseStatus.OK);
+
+                    } catch (AxisFault axisFault) {
+                        handleException("Error writing ?xsd output to client", axisFault, response);
+                        return;
+                    }
+                }
+
+            } else {
+                //cater for named xsds - check for the xsd name
+                String schemaName = (String) parameters.get("xsd");
+                AxisService service = (AxisService) cfgCtx.getAxisConfiguration()
+                    .getServices().get(serviceName);
+
+                if (service != null) {
+                    //run the population logic just to be sure
+                    service.populateSchemaMappings();
+                    //write out the correct schema
+                    Map schemaTable = service.getSchemaMappingTable();
+                    final XmlSchema schema = (XmlSchema) schemaTable.get(schemaName);
+                    //schema found - write it to the stream
+                    if (schema != null) {
+                        schema.write(response.getOutputStream());
+                        response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                        response.setStatus(ResponseStatus.OK);
+                    } else {
+                        // no schema available by that name  - send 404
+                        response.setStatus(ResponseStatus.NOT_FOUND, "Schema Not Found");
+                    }
+                }
+            }
+
+        } else if (parameters.isEmpty()) {
+
+            // request is for a service over GET without params, send service HTML
+            if (!(uri.endsWith(contextPath) || uri.endsWith(contextPath + "/"))) {
+
+                OutputStreamWriter out = new OutputStreamWriter(
+                    response.getOutputStream());
+                try {
+                    out.write(
+                        HTTPTransportReceiver.printServiceHTML(
+                            serviceName, cfgCtx));
+                    out.close();
+                    response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                    response.setStatus(ResponseStatus.OK);
+
+                } catch (IOException e) {
+                    handleException("Error writing service HTML to client", e, response);
+                    return;
+                }
+            } else {
+                processAxisGet(response, parameters);
+            }
+        }
+
+        response.commit();
+    }
+
+    public void processPost(HttpResponse response) {
+
+        try {
+            HTTPTransportUtils.processHTTPPostRequest(
+                msgContext,
+                request.getInputStream(),
+                response.getOutputStream(),
+                request.getHeader(Constants.CONTENT_TYPE),
+                request.getHeader(SOAPACTION),
+                request.getPath());
+        } catch (AxisFault e) {
+            handleException("Error processing POST request ", e, response);
+        }
+    }
+
+    private void processAxisGet(HttpResponse response, Map parameters) {
+        try {
+            // deal with GET request
+            boolean processed = HTTPTransportUtils.processHTTPGetRequest(
+                msgContext,
+                response.getOutputStream(),
+                request.getHeader(SOAPACTION),
+                request.getPath(),
+                cfgCtx,
+                parameters);
+
+            if (!processed) {
+                OutputStreamWriter out = new OutputStreamWriter(
+                    response.getOutputStream());
+                try {
+                    out.write(HTTPTransportReceiver.getServicesHTML(cfgCtx));
+                    out.close();
+                    response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+                    response.setStatus(ResponseStatus.OK);
+
+                } catch (IOException e) {
+                    handleException("Error writing ? output to client", e, response);
+                }
+            }
+        } catch (AxisFault e) {
+            handleException("Axis fault while serving GET request", e, response);
+        }
+    }
+
+    private void handleException(String msg, Exception e, HttpResponse response) {
+        log.error(msg, e);
+
+        try {
+            AxisEngine engine = new AxisEngine(cfgCtx);
+            msgContext.setProperty(MessageContext.TRANSPORT_OUT, response.getOutputStream());
+            msgContext.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO, response.getOutputStream());
+            MessageContext faultContext = engine.createFaultMessageContext(msgContext, e);
+            engine.sendFault(faultContext);
+
+        } catch (Exception ex) {
+            response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_PLAIN);
+            OutputStreamWriter out = new OutputStreamWriter(
+                response.getOutputStream());
+            try {
+                out.write(ex.getMessage());
+                out.close();
+            } catch (IOException ee) {
+            }
+
+        } finally {
+            response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+            response.commit();
+        }
+    }
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,21 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public interface Callback extends Runnable {
+
+    public void setResponse(HttpResponse r);
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,55 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+/**
+ * TODO clean up!!!
+ */
+public final class Constants {
+
+    public static final int DEFAULT_CONNECTION_LINGER = -1;
+    public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
+    public static final int DEFAULT_CONNECTION_UPLOAD_TIMEOUT = 300000;
+    public static final int DEFAULT_SERVER_SOCKET_TIMEOUT = 0;
+    public static final boolean DEFAULT_TCP_NO_DELAY = true;
+
+    public static final String CRLF = "\r\n";
+    public static final byte CR = (byte) '\r';
+    public static final byte LF = (byte) '\n';
+    public static final byte SP = (byte) ' ';
+    public static final String STRING_SP = " ";
+    public static final byte HT = (byte) '\t';
+    public static final byte COLON = (byte) ':';
+    public static final String STRING_COLON = ":";
+
+    public static final String HTTP_10 = "HTTP/1.0";
+    public static final String HTTP_11 = "HTTP/1.1";
+
+    public static final String GET = "GET";
+    public static final String POST = "POST";
+
+    public static final String TRANSFER_ENCODING = "Transfer-Encoding";
+    public static final String CONTENT_LENGTH = "Content-Length";
+    public static final String CONTENT_TYPE = "Content-Type";
+    public static final String TEXT_PLAIN = "text/plain";
+    public static final String TEXT_HTML = "text/html";
+    public static final String TEXT_XML = "text/xml";
+    public static final String CHUNKED = "chunked";
+    public static final String CONNECTION = "Connection";
+    public static final String CLOSE = "close";
+    public static final String OK = "OK";
+    public static final String LOCATION = "Location";
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,243 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.BufferOverflowException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * This represents an abstract HttpMessage - which may be a HttpRequest or a
+ * HttpResponse. This class defines the headers and the ByteBuffer that holds
+ * the message body in memory, along with an optional starting position within
+ * the buffer.
+ */
+public abstract class HttpMessage {
+
+    private static final Log log = LogFactory.getLog(HttpMessage.class);
+
+    /**
+     * http headers of this message
+     */
+    protected Map headers = new HashMap();
+    /**
+     * holder of the body content of this message
+     */
+    protected ByteBuffer buffer = ByteBuffer.allocate(4096);
+    /**
+     * position at the main buffer where the body starts (e.g. for requests)
+     */
+    protected int bodyStart;
+
+    /**
+     * A flag to detect if the getOutputStream() caller did not properly close() the stream
+     */
+    protected boolean outputStreamOpened = false;
+
+    public String getHeader(String name) {
+        return (String) headers.get(name);
+    }
+
+    public Map getHeaders() {
+        return headers;
+    }
+
+    public void addHeader(String name, String value) {
+        headers.put(name, value);
+    }
+
+    public boolean isChunked() {
+        return Constants.CHUNKED.equals(headers.get(Constants.TRANSFER_ENCODING));
+    }
+
+    /**
+     * Does this message specify a connection-close?
+     * @return true if connection should be closed
+     */
+    public boolean isConnectionClose() {
+        return Constants.CLOSE.equals(headers.get(Constants.CONNECTION));
+    }
+
+    public void setConnectionClose() {
+        headers.put(Constants.CONNECTION, Constants.CLOSE);
+    }
+
+    /**
+     * Return the legth of the content
+     * @return content length
+     */
+    public int getContentLength() {
+        String len = (String) headers.get(Constants.CONTENT_LENGTH);
+        if (len != null) {
+            return Integer.parseInt(len);
+        } else {
+            return -1;
+        }
+    }
+
+    /**
+     * Return an InputStream to read the body from the main ByteBuffer of this message
+     *
+     * @return an InputStream into the main ByteBuffer starting at the body
+     */
+    public InputStream getInputStream() {
+        // position to the start of the body
+        buffer.position(bodyStart);
+
+        // Returns an input stream for a ByteBuffer.
+        // The read() methods use the relative ByteBuffer get() methods.
+        return new InputStream() {
+            public synchronized int read() throws IOException {
+                if (!buffer.hasRemaining()) {
+                    return -1;
+                }
+                return buffer.get();
+            }
+
+            public synchronized int read(byte[] bytes, int off, int len) throws IOException {
+                // Read only what's left
+                len = Math.min(len, buffer.remaining());
+                buffer.get(bytes, off, len);
+                return len;
+            }
+        };
+    }
+
+    /**
+     * Get an OutputStream to write the body of this httpMessage
+     * @return an OutputStream to write the body
+     */
+    public OutputStream getOutputStream() {
+        // position for body
+        buffer.clear();
+        outputStreamOpened = true;
+        buffer.position(bodyStart);
+
+        // Returns an output stream for a ByteBuffer.
+        // The write() methods use the relative ByteBuffer put() methods.
+        return new OutputStream() {
+            public synchronized void write(int b) throws IOException {
+                while (true) {
+                    try {
+                        buffer.put((byte) b);
+                        return;
+                    } catch (BufferOverflowException bo) {
+                        expandBuffer();
+                    }
+                }
+            }
+
+            public synchronized void write(byte[] bytes, int off, int len) throws IOException {
+                while (true) {
+                    try {
+                        buffer.put(bytes, off, len);
+                        return;
+                    } catch (BufferOverflowException bo) {
+                        expandBuffer();
+                    }
+                }
+            }
+
+            public void close() throws IOException {
+                buffer.flip();
+                outputStreamOpened = false;
+            }
+        };
+    }
+
+    /**
+     * Expand (double) the main ByteBuffer of this message
+     */
+    private void expandBuffer() {
+        ByteBuffer newBuf = ByteBuffer.allocate(buffer.capacity() * 2);
+        log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+        buffer.flip();
+        buffer = newBuf.put(buffer);
+    }
+
+    /**
+     * Set the given buffer and the start position within that buffer as the
+     * body of this httpMessage
+     *
+     * @param buffer an externally allocated [and populated] buffer containing the message body
+     * @param bodyStart the start position of the actual body content within the buffer (default 0)
+     */
+    public void setBuffer(ByteBuffer buffer, int bodyStart) {
+        log.debug("HttpMessage.setBuffer() - buffer : " + buffer + " bodyStart: " + bodyStart);
+        this.buffer = buffer;
+        this.bodyStart = bodyStart;
+    }
+
+    /**
+     * Return a string representation of the message in HTTP wire-format
+     * @return a String representation of the message in HTTP wire-format
+     */
+    public String toString() {
+
+        StringBuffer sb = new StringBuffer();
+
+        // print httpMessage-line or status-line
+        sb.append(toStringLine());
+
+        // print headers
+        Iterator iter = headers.keySet().iterator();
+        while (iter.hasNext()) {
+            String headerName = (String) iter.next();
+            sb.append(headerName + Constants.STRING_COLON +
+                headers.get(headerName) + Constants.CRLF);
+        }
+        sb.append(Constants.CRLF);
+
+        if (buffer.limit() > 0) {
+            ByteBuffer bodyBuf = buffer;
+            if (bodyStart > 0) {
+                buffer.position(bodyStart);
+                bodyBuf = buffer.slice();
+            }
+
+            Charset set = Charset.forName("us-ascii");
+            CharsetDecoder dec = set.newDecoder();
+            try {
+                sb.append(dec.decode(bodyBuf));
+            } catch (CharacterCodingException e) {
+                e.printStackTrace();
+            }
+        }
+
+        sb.append(Constants.CRLF);
+
+        return sb.toString();
+    }
+
+    /**
+     * Return the first line of text for the toString() representation of this object. This would
+     * be a httpMessage-line or a status-line as per the RFC
+     *
+     * @return the first line of text for the toString()
+     */
+    public abstract String toStringLine();
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,185 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+
+/**
+ * Represents an HttpRequest message.
+ */
+public class HttpRequest extends HttpMessage {
+
+    private static final Log log = LogFactory.getLog(HttpRequest.class);
+
+    private IncomingHandler handler;
+
+    private String host;
+    private int port;
+    private String path;
+    private String method;
+    private String protocol;
+    private Map requestParams;
+
+    /** A pointer to a runnable to be executed once a response is received for this httpMessage */
+    private Runnable onResponse = null;
+
+    public HttpRequest(URL url) {
+        this.host = url.getHost();
+        this.port = url.getPort();
+        setPath(url.getPath()); // populate requestParams as well...
+        this.method = Constants.POST;
+        this.protocol = Constants.HTTP_11;
+        setConnectionClose(); // use connection-close for outgoing requests by default
+    }
+
+    /**
+     * Get the Runnable code block that should be executed upon receiving a response to
+     * this HttpRequest
+     * @return a Runnable code block to execute on receipt of a response to this httpMessage
+     */
+    public Runnable getOnResponse() {
+        return onResponse;
+    }
+
+    /**
+     * Set the response handler Runnable code block
+     * @param onResponse the Runnable to be executed on receipt of a response to the httpMessage
+     */
+    public void setOnResponse(Runnable onResponse) {
+        this.onResponse = onResponse;
+    }
+
+    /**
+     * Set the incoming message handler
+     * @param handler the incoming message handler
+     */
+    public void setHandler(IncomingHandler handler) {
+        this.handler = handler;
+    }
+
+    public IncomingHandler getHandler() {
+        return handler;
+    }
+
+    //TODO
+    public HttpRequest() {
+    }
+
+    // TODO
+    public HttpResponse createResponse() {
+        return new HttpResponse(this);
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public void setMethod(String method) {
+        this.method = method;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+        requestParams = new HashMap();
+        int qPos = path.indexOf("?");
+        if (qPos != -1) {
+            String params = path.substring(qPos+1);
+            StringTokenizer st = new StringTokenizer(params, "&");
+            while (st.hasMoreTokens()) {
+                String kvPair = st.nextToken();
+                int eqPos = kvPair.indexOf("=");
+                if (eqPos != -1) {
+                    requestParams.put(kvPair.substring(0, eqPos), kvPair.substring(eqPos));
+                } else {
+                    requestParams.put(kvPair, null);
+                }
+            }
+        }
+    }
+
+    public Iterator getParameterNames() {
+        return requestParams.keySet().iterator();
+    }
+
+    public String getParameter(String name) {
+        return (String) requestParams.get(name); 
+    }
+
+    public String getProtocol() {
+        return protocol;
+    }
+
+    public void setProtocol(String protocol) {
+        this.protocol = protocol;
+    }
+
+    public String toStringLine() {
+        return method + Constants.STRING_SP + path +
+            Constants.STRING_SP + protocol + Constants.CRLF;
+    }
+
+    /**
+     * Return a ByteBuffer representation of this message in HTTP wire-format
+     * @return the ByteBuffer representation of this message
+     */
+    public ByteBuffer getBuffer() {
+        return ByteBuffer.wrap(toString().getBytes());
+    }
+
+    /**
+     * Does this message specify a connection-close?
+     * @return true if connection should be closed
+     */
+    public boolean isConnectionClose() {
+        String connection = (String) headers.get(Constants.CONNECTION);
+        if (connection == null && Constants.HTTP_10.equals(protocol)) {
+            return true;
+        } else {
+            return Constants.CLOSE.equals(headers.get(Constants.CONNECTION));
+        }
+    }
+
+    //------------------------------ TESTING CODE ------------------------
+    /**
+     * Convenience method for testing etc
+     * @param body
+     */
+    public void setBody(String body) {
+        buffer.position(bodyStart);
+        buffer.put(body.getBytes());
+        buffer.flip();
+    }
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,115 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents an HttpResponse to a httpMessage received or sent.
+ */
+public class HttpResponse extends HttpMessage {
+
+    private static final Log log = LogFactory.getLog(HttpResponse.class);
+
+    /** The associated HttpRequest (if any) for this response */
+    private HttpRequest request;
+    /** The http version used */
+    private String version = null;
+    /** The response status - HTTP response */
+    private ResponseStatus status = new ResponseStatus();
+
+    /**
+     * Create a HttpResponse for the given HttpRequest. Sets connection-close
+     * if the httpMessage specified it.
+     * @param request the HttpRequest for which this reponse will apply
+     */
+    public HttpResponse(HttpRequest request) {
+        this.request = request;
+        this.version = Constants.HTTP_11;
+        if (request.isConnectionClose()) {
+            addHeader(Constants.CONNECTION, Constants.CLOSE);
+        }
+        buffer.flip();
+    }
+
+    /**
+     * Create a new HttpResponse
+     */
+    public HttpResponse() {}
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setResponseCode(int code) {
+        status.setCode(code);
+    }
+
+    public void setResponseMessage(String msg) {
+        status.setMessage(msg);
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    /**
+     * Perform actual commit of this response out to the wire. Calls on the message
+     * handler to process the message as required
+     */
+    public void commit() {
+        if (outputStreamOpened) {
+            // if someone didnt properly close the OutputStream after writing, flip buffer
+            buffer.flip();
+            outputStreamOpened = false;
+        }
+        if (request != null) {
+            request.getHandler().setResponse(this);
+        }
+    }
+
+    public void setStatus(ResponseStatus status) {
+        this.status = status;
+    }
+
+    public void setStatus(ResponseStatus status, String msg) {
+        this.status = status;
+        status.setMessage(msg);
+    }
+
+    public ResponseStatus getStatus() {
+        return status;
+    }
+
+    /**
+     * Return a ByteBuffer representation of this message in HTTP wire-format for transmission
+     * @return the ByteBuffer representation of this message
+     */
+    public ByteBuffer getBuffer() {
+        if (buffer.limit() > 0) {
+            headers.put(Constants.CONTENT_LENGTH, Integer.toString(buffer.limit()));
+        }
+        return ByteBuffer.wrap(toString().getBytes());
+    }
+
+    public String toStringLine() {
+        return version + Constants.STRING_SP + status + Constants.CRLF;
+    }
+
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,23 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public interface HttpService {
+    
+    public void handleRequest(HttpRequest request);
+
+    public void handleResponse(HttpResponse response, Runnable callback);
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,114 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This is a generic handler, which will delegate events to a httpMessage/response
+ * specific handler as necessary. An instance of this class is expected to be
+ * dedicated to a single HTTP connection
+ *
+ * An IncomingHandler created for each accepted connection would be responsible for
+ * reading from the newly opened channel, and subsequently firing events when a
+ * message has been read and ready for processing. If the httpMessage handling logic then
+ * sets a response to this handler (or the corresponding HttpResponse commits itself)
+ * this handler will own (and delegate) the sending of that response back.
+ */
+public class IncomingHandler implements Runnable {
+
+    private static final Log log = LogFactory.getLog(IncomingHandler.class);
+
+    private SelectionKey sk;
+    private SocketChannel socket;
+
+    private ReadHandler incomingHandler = new ReadHandler(true);
+    private WriteHandler responseHandler = new WriteHandler();
+
+    private HttpService httpService;
+
+    public IncomingHandler(SocketChannel socket, Selector selector,
+        HttpService httpService) throws IOException {
+
+        this.httpService = httpService;
+        this.socket = socket;
+        socket.configureBlocking(false);
+        // Optionally try first read now
+        sk = socket.register(selector, 0);
+        sk.attach(this);
+        sk.interestOps(SelectionKey.OP_READ);   // we are only interested to read
+        selector.wakeup();
+    }
+
+    public void setResponse(HttpResponse response) {
+        responseHandler.setMessage(response.getBuffer(), response.isConnectionClose());
+        sk.interestOps(SelectionKey.OP_WRITE);
+        sk.selector().wakeup();
+        log.debug("\tIncomingHandler.setResponse()");
+    }
+
+    /**
+     * The main handler routine
+     */
+    public void run() {
+
+        if (sk.isReadable()) {
+            log.debug("\tIncomingHandler run() - READABLE");
+            if (incomingHandler.handle(socket, sk)) {
+                log.debug("\tA httpMessage has been read completely");
+                // if httpMessage processing is complete
+                HttpRequest request = (HttpRequest) incomingHandler.getHttpMessage();
+                request.setHandler(this);
+                log.debug("\tFire event for received httpMessage");
+                httpService.handleRequest(request);
+
+                // if pipelining is used
+                if (!incomingHandler.isConnectionClose()) {
+                    // prepare to read another httpMessage - reset and reuse
+                    incomingHandler.reset();
+                    log.debug("\tReset read handler to read next pipelined httpMessage");
+                }
+            }
+        } else if (sk.isWritable()) {
+            log.debug("\tIncomingHandler run() - WRITEABLE");
+            if (responseHandler.handle(socket)) {
+                log.debug("\tThe response has been written completely");
+                // response has been written completely
+                if (responseHandler.isConnectionClose()) {
+                    log.debug("\tClosing connection normally");
+                    sk.cancel();
+                    try {
+                        socket.close();
+                    } catch (IOException e) {
+                        log.warn("Error during socket close : " + e.getMessage(), e);
+                    }
+                } else {
+                    // now we are again interested to read
+                    sk.interestOps(SelectionKey.OP_READ);
+                }
+            }
+        } else {
+            log.warn("IncomingHandler run(!!!unknown event!!!) : " + sk.readyOps());
+        }
+    }
+
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,95 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This handler owns sending of a httpMessage to an external endpoint using a WriteHandler
+ * and reading back the response. This does not handle persistent/pipelined connections
+ */
+public class OutgoingHandler implements Runnable {
+
+    private static final Log log = LogFactory.getLog(OutgoingHandler.class);
+
+    private SelectionKey sk;
+    private SocketChannel socket;
+    private HttpService httpService;
+
+    private WriteHandler writeHandler = new WriteHandler();
+    private ReadHandler readHandler = new ReadHandler(false); /* no response from this TODO*/
+    private Runnable callback = null;
+
+    OutgoingHandler(SocketChannel socket, SelectionKey sk, HttpRequest request, HttpService httpService) {
+        this.httpService = httpService;
+        this.socket = socket;
+        this.sk = sk;
+        writeHandler.setMessage(request.getBuffer(), true /* connection close */);
+    }
+
+    public Runnable getCallback() {
+        return callback;
+    }
+
+    public void setCallback(Runnable callback) {
+        this.callback = callback;
+    }
+
+    public void run() {
+        try {
+            if (sk.isConnectable() && socket.finishConnect()) {
+                log.debug("\tIncomingHandler run() - CONNECTABLE and CONNECTED");
+                sk.interestOps(SelectionKey.OP_WRITE);
+
+            } else if (sk.isWritable()) {
+                log.debug("\tIncomingHandler run() - WRITEABLE");
+                if (writeHandler.handle(socket)) {
+                    log.debug("\tRequest written completely");
+                    // response has been written completely
+                    // now read response or at least result code
+                    sk.interestOps(SelectionKey.OP_READ);
+                }
+
+            } else if (sk.isReadable()) {
+                log.debug("\tIncomingHandler run() - READABLE");
+                if (readHandler.handle(socket, sk)) {
+                    log.debug("\tResponse read completely");
+                    // if httpMessage processing is complete
+                    log.debug("\tFire event for response read");
+                    httpService.handleResponse((HttpResponse) readHandler.getHttpMessage(), callback);
+
+                    // if pipelining is used
+                    /*if (!readHandler.isConnectionClose()) {
+                        // prepare to read another httpMessage
+                        readHandler.reset(this);
+                        log.debug("\thandler reset");
+                    }*/
+                    socket.close();
+                    sk.cancel();
+                    log.debug("Socket closed and SelectionKey cancelled");
+                }
+            }
+        }
+        catch (IOException e) {
+            log.error("Error in OutGoingHandler : " + e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,200 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * dynamic buffer expansion on need - done
+ * TODO socket timeouts, 100 continue, 202 processing - asap
+ * TODO ompression/mtom & mime/SSL - sometime soon
+ * TODO http sessions - do we need?
+ * TODO ByteBuffer pools and reuse of buffers, multiple selectors/IO threads - advanced
+ */
+public class Reactor implements Runnable {
+
+    private static final Log log = LogFactory.getLog(Reactor.class);
+
+    /**
+     * The [single] main selector
+     */
+    final Selector selector;
+    /**
+     * The server socket channel
+     */
+    final ServerSocketChannel serverSocketChannel;
+
+    /**
+     * The maximum number of milli secs a select call would block for
+     */
+    private int selectTimeout = 500;
+    /**
+     * variable to be set when a shutdown is required
+     */
+    private boolean shutdownRequested = false;
+    /**
+     * The HttpService on which events are fired upon when new messages are received
+     */
+    private HttpService httpService;
+
+    private static Reactor _httpReactor, _httpsReactor;
+
+    public static synchronized Reactor getInstance(boolean secure) {
+        if (secure) {
+            return _httpsReactor;
+        } else {
+            return _httpReactor;
+        }
+    }
+
+    public static synchronized Reactor createReactor(
+        String host, int port, boolean secure, HttpService httpService) throws IOException {
+        if (secure) {
+            if (_httpsReactor != null) {
+                _httpsReactor.setShutdownRequested(true);
+            }
+            _httpsReactor = new Reactor(host, port, secure, httpService);
+            return _httpsReactor;
+        } else {
+            if (_httpReactor != null) {
+                _httpReactor.setShutdownRequested(true);
+            }
+            _httpReactor = new Reactor(host, port, secure, httpService);
+            return _httpReactor;
+        }
+    }
+
+    /**
+     * Starts a new Reactor on the specified port and prepares to
+     * accept new connections
+     *
+     * @param port the server listen port
+     * @throws IOException
+     */
+    private Reactor(String host, int port, boolean secure, HttpService httpService) throws IOException {
+
+        this.httpService = httpService;
+        selector = Selector.open();
+        serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.socket().bind(
+            new InetSocketAddress(port));
+            /*new InetSocketAddress(
+                host == null ? InetAddress.getLocalHost() : InetAddress.getByName(host), port));*/
+        serverSocketChannel.configureBlocking(false);
+
+        SelectionKey sk = serverSocketChannel.register(
+            selector, SelectionKey.OP_ACCEPT);
+        sk.attach(new Acceptor());
+        log.info("Reactor Created for host : " + host + " on port : " + port);
+    }
+
+    /**
+     * This is the main routine of the Reactor, which will
+     */
+    public void run() {
+        try {
+            while (!shutdownRequested) {
+                int keys = selector.select(selectTimeout);
+                if (keys > 0) {
+                    Set selected = selector.selectedKeys();
+                    Iterator it = selected.iterator();
+                    while (it.hasNext())
+                        dispatch((SelectionKey) (it.next()));
+                    selected.clear();
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * The main dispatch routine. This currently does not delegate to a thread pool
+     * and leave this to the user
+     *
+     * @param k the selection key for the event
+     */
+    void dispatch(SelectionKey k) {
+        Runnable r = (Runnable) (k.attachment());
+        if (r != null)
+            r.run();
+    }
+
+    /**
+     * Accepts a new connection and hands it off to a new IncomingHandler instance
+     */
+    class Acceptor implements Runnable {
+
+        public void run() {
+            SocketChannel socket;
+            try {
+                socket = serverSocketChannel.accept();
+                if (socket != null) {
+                    log.info("Accepting new connection...");
+                    // we have a *new* HTTP connection here
+                    new IncomingHandler(socket, selector, httpService);
+                }
+            } catch (IOException e) {
+                handleException("Exception while accepting a connection : " + e.getMessage(), e);
+            }
+        }
+    }
+
+    public void send(HttpRequest request, Runnable callback) {
+        try {
+            InetSocketAddress addr = new InetSocketAddress(
+                request.getHost(), request.getPort());
+
+            SocketChannel socket = SocketChannel.open();
+            socket.configureBlocking(false);
+            socket.connect(addr);
+
+            SelectionKey sk = socket.register(selector, SelectionKey.OP_CONNECT);
+            OutgoingHandler outHandler = new OutgoingHandler(socket, sk, request, httpService);
+            if (callback != null) {
+                outHandler.setCallback(callback);
+            }
+            sk.attach(outHandler);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private static void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        e.printStackTrace(); // TODO
+        // throw new xxxx TODO
+    }
+
+    public void setShutdownRequested(boolean shutdownRequested) {
+        this.shutdownRequested = shutdownRequested;
+    }
+}

Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,92 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import java.net.URL;
+import java.io.IOException;
+
+public class ReactorTester {
+
+    private Reactor r = null;
+
+    public static void main(String[] args) throws Exception {
+        ReactorTester rt = new ReactorTester();
+        rt.runDemo();
+    }
+
+    private void runDemo() throws IOException {
+
+        r = Reactor.createReactor(null, 9001, false,
+            new HttpService() {
+
+                public void handleRequest(HttpRequest request) {
+                    try {
+                        System.out.println("Processing Request : " + request);
+                        // create new HttpRequest
+                        HttpRequest forwardReq = new HttpRequest(
+                            new URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+
+                        Util.copyStreams(request.getInputStream(), forwardReq.getOutputStream());
+                        SimpleCallback cb = new SimpleCallback(request);
+                        r.send(forwardReq, cb);
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                public void handleResponse(HttpResponse response, Runnable callback) {
+                    System.out.println("Received Response : " + response);
+                    SimpleCallback cb = (SimpleCallback) callback;
+                    cb.setResponse(response);
+                    cb.run();
+                }
+            });
+
+        new Thread(r).start();
+    }
+
+
+    public class SimpleCallback implements Callback {
+        HttpResponse response;
+        HttpRequest request;
+
+        SimpleCallback(HttpRequest request) {
+            this.request = request;
+        }
+
+        public void setResponse(HttpResponse response) {
+            this.response = response;
+        }
+
+        public void setRequest(HttpRequest request) {
+            this.request = request;
+        }
+
+        public void run() {
+            HttpResponse newResponse = request.createResponse();
+            try {
+                Util.copyStreams(response.getInputStream(), newResponse.getOutputStream());
+                newResponse.setStatus(ResponseStatus.OK);
+                newResponse.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_XML);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            newResponse.commit();
+        }
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org