You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2006/10/11 20:02:10 UTC
svn commit: r462889 [1/2] - in
/incubator/synapse/trunk/java/modules/niohttp: ./ src/ src/org/
src/org/apache/ src/org/apache/axis2/ src/org/apache/axis2/transport/
src/org/apache/axis2/transport/niohttp/
src/org/apache/axis2/transport/niohttp/impl/
Author: asankha
Date: Wed Oct 11 11:02:08 2006
New Revision: 462889
URL: http://svn.apache.org/viewvc?view=rev&rev=462889
Log:
adding the first cut of the non-blocking http transport
needs clean up and lots of refactoring - see ReactorTester.java for a hint for how this will be used in synapse
checking in for review and comments - and is not included into the build yet
Added:
incubator/synapse/trunk/java/modules/niohttp/
incubator/synapse/trunk/java/modules/niohttp/src/
incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties
incubator/synapse/trunk/java/modules/niohttp/src/org/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java
incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java
Added: incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/log4j.properties Wed Oct 11 11:02:08 2006
@@ -0,0 +1,25 @@
+log4j.rootCategory=DEBUG,logfile,stdout
+
+# Set the level to DEBUG if you want to log all SlideExceptions (some of them aren't errors)
+#log4j.category.org.apache.axis2=DEBUG
+log4j.category.org.apache.synapse=DEBUG
+log4j.category.org.apache.axis2.transport.niohttp=DEBUG
+
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %C{1} - %m %n
+
+#### appender writes to a file
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+#log4j.appender.logfile.File=logs/synapse.log
+
+
+# Control the maximum log file size
+log4j.appender.logfile.MaxFileSize=1000KB
+# Archive log files (one backup file here)
+log4j.appender.logfile.MaxBackupIndex=10
+
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%6r [%t] %5p %C{1} (%F:%L) - %m%n
\ No newline at end of file
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Axis2CallbackImpl.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,62 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.transport.niohttp.impl.HttpRequest;
+import org.apache.axis2.transport.niohttp.impl.HttpResponse;
+import org.apache.axis2.transport.niohttp.impl.Callback;
+
+public class Axis2CallbackImpl implements Callback {
+
+ private HttpRequest request;
+ private HttpResponse response;
+ private MessageContext msgCtx;
+
+ public Axis2CallbackImpl(HttpRequest request, MessageContext msgCtx) {
+ this.request = request;
+ this.msgCtx = msgCtx;
+ }
+
+ public HttpRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(HttpRequest request) {
+ this.request = request;
+ }
+
+ public HttpResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(HttpResponse response) {
+ this.response = response;
+ }
+
+ public MessageContext getMsgCtx() {
+ return msgCtx;
+ }
+
+ public void setMsgCtx(MessageContext msgCtx) {
+ this.msgCtx = msgCtx;
+ }
+
+ public void run() {
+ System.out.println("Reponse received for HttpRequest : " + request +
+ "\nAxis message :" + msgCtx.getEnvelope());
+ }
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpListener.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,201 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.ListenerManager;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.util.UUIDGenerator;
+import org.apache.axis2.util.threadpool.DefaultThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+public class NHttpListener implements TransportListener, HttpService {
+
+ private static final Log log = LogFactory.getLog(NHttpListener.class);
+
+ private static final int WORKERS_MAX_THREADS = 40;
+ private static final long WORKER_KEEP_ALIVE = 100L;
+ private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+
+ private ConfigurationContext cfgCtx;
+ private String serviceEPRPrefix;
+ private Reactor _reactor;
+ private Executor workerPool = null;
+
+ public NHttpListener() {}
+
+ public NHttpListener(ConfigurationContext cfgCtx, int port) throws AxisFault {
+
+ this.cfgCtx = cfgCtx;
+ TransportInDescription httpDescription = new TransportInDescription(
+ new QName(org.apache.axis2.Constants.TRANSPORT_HTTP));
+ httpDescription.setReceiver(this);
+
+ ListenerManager listenerManager = cfgCtx.getListenerManager();
+ if (listenerManager == null) {
+ listenerManager = new ListenerManager();
+ listenerManager.init(cfgCtx);
+ }
+ cfgCtx.getListenerManager().addListener(httpDescription, true);
+ }
+
+ public void init(ConfigurationContext cfgCtx, TransportInDescription transprtIn)
+ throws AxisFault {
+
+ this.cfgCtx = cfgCtx;
+ try {
+ int port = 8080;
+ String host;
+ Parameter param = transprtIn.getParameter(PARAM_PORT);
+ if (param != null)
+ port = Integer.parseInt((String) param.getValue());
+
+ param = transprtIn.getParameter(HOST_ADDRESS);
+ if (param != null)
+ host = ((String) param.getValue()).trim();
+ else
+ host = java.net.InetAddress.getLocalHost().getHostName();
+
+ serviceEPRPrefix = "http://" + host + (port == 80 ? "" : ":" + port) +
+ cfgCtx.getServiceContextPath() + "/";
+
+ _reactor = Reactor.createReactor(host, port, false, this);
+
+ workerPool = new ThreadPoolExecutor(
+ 1,
+ WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT,
+ new LinkedBlockingQueue(),
+ new DefaultThreadFactory(
+ new ThreadGroup("HTTP Worker thread group"),
+ "HTTPWorker"));
+
+ } catch (Exception e) {
+ throw new AxisFault(e);
+ }
+ }
+
+ public void start() throws AxisFault {
+ log.debug("Starting IO Reactor...");
+ new Thread(_reactor).start();
+ log.info("IO Reactor started, accepting connections...");
+ }
+
+ public void stop() throws AxisFault {
+ _reactor.setShutdownRequested(true);
+ log.info("IO Reactor shut down");
+ }
+
+ public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
+ return new EndpointReference(serviceEPRPrefix + serviceName);
+ }
+
+ //TODO This should handle other endpoints too. Ex: If a rest call has made
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ EndpointReference[] endpointReferences = new EndpointReference[1];
+ endpointReferences[0] = new EndpointReference(serviceEPRPrefix + serviceName);
+ return endpointReferences;
+ }
+
+ public void handleRequest(HttpRequest request) {
+
+ log.debug("@@@@ Got new HTTP request : " + request.toStringLine());
+
+ MessageContext msgContext = new MessageContext();
+ msgContext.setIncomingTransportName(Constants.TRANSPORT_HTTP);
+ try {
+ TransportOutDescription transportOut = cfgCtx.getAxisConfiguration()
+ .getTransportOut(new QName(Constants.TRANSPORT_HTTP));
+ TransportInDescription transportIn = cfgCtx.getAxisConfiguration()
+ .getTransportIn(new QName(Constants.TRANSPORT_HTTP));
+
+ msgContext.setConfigurationContext(cfgCtx);
+
+ /* TODO session handling
+ String sessionKey = request.getSession(true).getId();
+ if (cfgCtx.getAxisConfiguration().isManageTransportSession()) {
+ SessionContext sessionContext = sessionManager.getSessionContext(sessionKey);
+ msgContext.setSessionContext(sessionContext);
+ }*/
+
+ msgContext.setTransportIn(transportIn);
+ msgContext.setTransportOut(transportOut);
+ msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+ msgContext.setServerSide(true);
+ msgContext.setProperty(Constants.Configuration.TRANSPORT_IN_URL, request.getPath());
+
+ msgContext.setProperty(MessageContext.TRANSPORT_HEADERS, request.getHeaders());
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, request);
+
+ workerPool.execute(new Worker(cfgCtx, msgContext, request));
+
+ } catch (AxisFault e) {
+ HttpResponse response = request.createResponse();
+
+ try {
+ AxisEngine engine = new AxisEngine(cfgCtx);
+ msgContext.setProperty(MessageContext.TRANSPORT_OUT, response.getOutputStream());
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, response.getOutputStream());
+
+ MessageContext faultContext = engine.createFaultMessageContext(msgContext, e);
+ engine.sendFault(faultContext);
+
+ response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+
+ } catch (Exception ex) {
+ response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+ response.addHeader(org.apache.axis2.transport.niohttp.impl.Constants.CONTENT_TYPE,
+ org.apache.axis2.transport.niohttp.impl.Constants.TEXT_PLAIN);
+ OutputStreamWriter out = new OutputStreamWriter(
+ response.getOutputStream());
+ try {
+ out.write(ex.getMessage());
+ out.close();
+ } catch (IOException ee) {
+ }
+ }
+ response.commit();
+ return;
+ }
+ }
+
+ public void handleResponse(HttpResponse response, Runnable callback) {
+
+ log.debug("@@@@ Got new HTTP response : " + response.toStringLine());
+ //callback.setResponse(response);
+ callback.run();
+ }
+
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/NHttpSender.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,246 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+
+import javax.xml.stream.XMLStreamException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.MalformedURLException;
+
+public class NHttpSender extends AbstractHandler implements TransportSender {
+
+ public void invoke(MessageContext msgContext) throws AxisFault {
+
+ OMOutputFormat format = getOMOutputFormat(msgContext);
+
+ // Trasnport URL maybe different from the WSA-To
+ EndpointReference epr = null;
+ String transportURL = (String)
+ msgContext.getProperty(Constants.Configuration.TRANSPORT_URL);
+
+ if (transportURL != null) {
+ epr = new EndpointReference(transportURL);
+ } else if
+ ((msgContext.getTo() != null) &&
+ !AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(
+ msgContext.getTo().getAddress()) &&
+ !AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(
+ msgContext.getTo().getAddress())) {
+ epr = msgContext.getTo();
+ }
+
+ // select the OMElement of the message to be sent - checking if using REST
+ OMElement dataOut = null;
+ if (msgContext.isDoingREST()) {
+ dataOut = msgContext.getEnvelope().getBody().getFirstElement();
+ } else {
+ dataOut = msgContext.getEnvelope();
+ }
+
+ if (epr != null) {
+ // this is a new message being sent to the above EPR
+ if (!epr.getAddress().equals(AddressingConstants.Final.WSA_NONE_URI)) {
+ Reactor reactor = null;
+ if (epr.getAddress().startsWith("http")) {
+ reactor = Reactor.getInstance(false);
+ } else if (epr.getAddress().startsWith("https")) {
+ reactor = Reactor.getInstance(true);
+ }
+
+ if (reactor != null) {
+ HttpRequest req = null;
+ try {
+ req = new HttpRequest(new URL(epr.getAddress()));
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ // TODO
+ }
+ populateHttpMessage(req, msgContext, format, dataOut);
+
+ Axis2CallbackImpl cb = new Axis2CallbackImpl(req, msgContext);
+ reactor.send(req, cb);
+
+ } else {
+ // TODO
+ }
+
+ //new CommonsHTTPTransportSender().writeMessageWithCommons(
+ // msgContext, epr, dataOut, format);
+
+ } else {
+ // TODO handle
+ }
+ } else {
+ // this is a response being sent to a request already received
+ if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) != null) {
+ if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO)
+ instanceof HttpRequest) {
+ sendAsyncResponse(msgContext, format, dataOut);
+ } else {
+ sendUsingOutputStream(msgContext, format, dataOut);
+ }
+ } else {
+ throw new AxisFault("Both message 'TO' and Property MessageContext.TRANSPORT_OUT" +
+ " is Null, Do not know where to send");
+ }
+ }
+
+ if (msgContext.getOperationContext() != null) {
+ msgContext.getOperationContext().setProperty(
+ Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ }
+ }
+
+ private OMOutputFormat getOMOutputFormat(MessageContext msgContext) {
+ OMOutputFormat format = new OMOutputFormat();
+ String charSetEnc = (String) msgContext.getProperty(
+ Constants.Configuration.CHARACTER_SET_ENCODING);
+
+ if (charSetEnc != null) {
+ format.setCharSetEncoding(charSetEnc);
+ } else {
+ OperationContext opctx = msgContext.getOperationContext();
+ if (opctx != null) {
+ charSetEnc = (String) opctx.getProperty(
+ Constants.Configuration.CHARACTER_SET_ENCODING);
+ }
+ }
+
+ // if the char set enc is still not found use the default
+ if (charSetEnc == null) {
+ charSetEnc = MessageContext.DEFAULT_CHAR_SET_ENCODING;
+ }
+
+ msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext));
+ msgContext.setDoingSwA(HTTPTransportUtils.doWriteSwA(msgContext));
+ msgContext.setDoingREST(HTTPTransportUtils.isDoingREST(msgContext));
+
+ format.setSOAP11(msgContext.isSOAP11());
+ format.setDoOptimize(msgContext.isDoingMTOM());
+ format.setDoingSWA(msgContext.isDoingSwA());
+ format.setCharSetEncoding(charSetEnc);
+ return format;
+ }
+
+ private void sendAsyncResponse(MessageContext msgContext, OMOutputFormat format, OMElement dataOut) throws AxisFault {
+
+ HttpRequest request = (HttpRequest) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
+ HttpResponse response = request.createResponse();
+ response.setStatus(ResponseStatus.OK);
+ populateHttpMessage(response, msgContext, format, dataOut);
+ response.commit();
+ }
+
+ private void populateHttpMessage(HttpMessage message, MessageContext msgContext, OMOutputFormat format, OMElement dataOut) throws AxisFault {
+
+ String contentType;
+ Object contentTypeObject = msgContext.getProperty(Constants.Configuration.CONTENT_TYPE);
+ if (contentTypeObject != null) {
+ contentType = (String) contentTypeObject;
+ } else if (msgContext.isDoingREST()) {
+ contentType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
+ } else {
+ contentType = format.getContentType();
+ format.setSOAP11(msgContext.isSOAP11());
+ }
+
+ message.addHeader(org.apache.axis2.transport.niohttp.impl.Constants.CONTENT_TYPE,
+ contentType + "; charset=" + format.getCharSetEncoding());
+
+ OutputStream out = message.getOutputStream();
+ format.setDoOptimize(msgContext.isDoingMTOM());
+ try {
+ dataOut.serializeAndConsume(out, format);
+ out.close();
+ } catch (Exception e) {
+ throw new AxisFault(e);
+ }
+ }
+
+ private void sendUsingOutputStream(MessageContext msgContext,
+ OMOutputFormat format,
+ OMElement dataOut) throws AxisFault {
+ OutputStream out =
+ (OutputStream) msgContext
+ .getProperty(MessageContext.TRANSPORT_OUT);
+
+ if (msgContext.isServerSide()) {
+ OutTransportInfo transportInfo =
+ (OutTransportInfo) msgContext
+ .getProperty(Constants.OUT_TRANSPORT_INFO);
+
+ if (transportInfo != null) {
+ String contentType;
+
+ Object contentTypeObject = msgContext.getProperty(Constants.Configuration.CONTENT_TYPE);
+ if (contentTypeObject != null) {
+ contentType = (String) contentTypeObject;
+ } else if (msgContext.isDoingREST()) {
+ contentType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
+ } else {
+ contentType = format.getContentType();
+ format.setSOAP11(msgContext.isSOAP11());
+ }
+
+
+ String encoding = contentType + "; charset="
+ + format.getCharSetEncoding();
+
+ transportInfo.setContentType(encoding);
+ } else {
+ throw new AxisFault(Constants.OUT_TRANSPORT_INFO +
+ " has not been set");
+ }
+ }
+
+ format.setDoOptimize(msgContext.isDoingMTOM());
+ try {
+ dataOut.serializeAndConsume(out, format);
+ } catch (XMLStreamException e) {
+ throw new AxisFault(e);
+ }
+ }
+
+ public void cleanup(MessageContext msgContext) throws AxisFault {
+ // do nothing
+ }
+
+ public void init(ConfigurationContext confContext, TransportOutDescription transportOut) throws AxisFault {
+ // do nothing
+ }
+
+ public void stop() {
+ // do nothing
+ }
+
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/Worker.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,271 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.transport.niohttp.impl.*;
+import org.apache.axis2.transport.http.HTTPTransportReceiver;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.ws.commons.schema.XmlSchema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.net.SocketException;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+public class Worker implements Runnable {
+
+ private static final Log log = LogFactory.getLog(Worker.class);
+
+ private MessageContext msgContext = null;
+ private ConfigurationContext cfgCtx = null;
+ private HttpRequest request = null;
+ private String contextPath = null;
+ private String servicePath = null;
+ private static final String SOAPACTION = "SOAPAction";
+
+ Worker(ConfigurationContext cfgCtx, MessageContext msgContext, HttpRequest request) {
+ this.cfgCtx = cfgCtx;
+ this.msgContext = msgContext;
+ this.request = request;
+ contextPath = "/" + cfgCtx.getContextRoot() + "/";
+ servicePath = cfgCtx.getServiceContextPath();
+ }
+
+ public void run() {
+
+ HttpResponse response = request.createResponse();
+
+ if (Constants.GET.equals(request.getMethod())) {
+ processGet(response);
+
+ } else if (Constants.POST.equals(request.getMethod())) {
+ processPost(response);
+
+ } else {
+ handleException("Unsupported method : " + request.getMethod(), null, response);
+ }
+
+ //response.commit();
+ }
+
+ private void processGet(HttpResponse response) {
+
+ String uri = request.getPath();
+ String serviceName = uri.substring(uri.lastIndexOf("/") + 1);
+ int qPos = serviceName.indexOf("?");
+ if (qPos != -1) {
+ serviceName = serviceName.substring(0, qPos);
+ }
+
+ Map parameters = new HashMap();
+ Iterator iter = request.getParameterNames();
+ while (iter.hasNext()) {
+ String name = (String) iter.next();
+ parameters.put(name, request.getParameter(name));
+ }
+
+ if (uri.equals("/favicon.ico")) {
+ response.setStatus(ResponseStatus.MOVED_PERMANENTLY, "Redirect");
+ response.addHeader(Constants.LOCATION, "http://ws.apache.org/favicon.ico");
+
+ } else if (!uri.startsWith(contextPath)) {
+ response.setStatus(ResponseStatus.MOVED_PERMANENTLY, "Redirect");
+ response.addHeader(Constants.LOCATION, contextPath);
+
+ } else if (parameters.containsKey("wsdl")) {
+ AxisService service = (AxisService) cfgCtx.getAxisConfiguration().
+ getServices().get(serviceName);
+ if (service != null) {
+ try {
+ service.printWSDL(response.getOutputStream(),
+ Util.getIpAddress(), servicePath);
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+
+ } catch (AxisFault e) {
+ handleException("Axis2 fault writing ?wsdl output", e, response);
+ return;
+ } catch (SocketException e) {
+ handleException("Error getting ip address for ?wsdl output", e, response);
+ return;
+ }
+ }
+
+ } else if (parameters.containsKey("wsdl2")) {
+ AxisService service = (AxisService) cfgCtx.getAxisConfiguration().
+ getServices().get(serviceName);
+ if (service != null) {
+ try {
+ service.printWSDL2(response.getOutputStream(),
+ Util.getIpAddress(), servicePath);
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+
+ } catch (AxisFault e) {
+ handleException("Axis2 fault writing ?wsdl2 output", e, response);
+ return;
+ } catch (SocketException e) {
+ handleException("Error getting ip address for ?wsdl2 output", e, response);
+ return;
+ }
+ }
+
+ } else if (parameters.containsKey("xsd")) {
+ if (parameters.get("xsd") == null || "".equals(parameters.get("xsd"))) {
+ AxisService service = (AxisService) cfgCtx.getAxisConfiguration()
+ .getServices().get(serviceName);
+ if (service != null) {
+ try {
+ service.printSchema(response.getOutputStream());
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+
+ } catch (AxisFault axisFault) {
+ handleException("Error writing ?xsd output to client", axisFault, response);
+ return;
+ }
+ }
+
+ } else {
+ //cater for named xsds - check for the xsd name
+ String schemaName = (String) parameters.get("xsd");
+ AxisService service = (AxisService) cfgCtx.getAxisConfiguration()
+ .getServices().get(serviceName);
+
+ if (service != null) {
+ //run the population logic just to be sure
+ service.populateSchemaMappings();
+ //write out the correct schema
+ Map schemaTable = service.getSchemaMappingTable();
+ final XmlSchema schema = (XmlSchema) schemaTable.get(schemaName);
+ //schema found - write it to the stream
+ if (schema != null) {
+ schema.write(response.getOutputStream());
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+ } else {
+ // no schema available by that name - send 404
+ response.setStatus(ResponseStatus.NOT_FOUND, "Schema Not Found");
+ }
+ }
+ }
+
+ } else if (parameters.isEmpty()) {
+
+ // request is for a service over GET without params, send service HTML
+ if (!(uri.endsWith(contextPath) || uri.endsWith(contextPath + "/"))) {
+
+ OutputStreamWriter out = new OutputStreamWriter(
+ response.getOutputStream());
+ try {
+ out.write(
+ HTTPTransportReceiver.printServiceHTML(
+ serviceName, cfgCtx));
+ out.close();
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+
+ } catch (IOException e) {
+ handleException("Error writing service HTML to client", e, response);
+ return;
+ }
+ } else {
+ processAxisGet(response, parameters);
+ }
+ }
+
+ response.commit();
+ }
+
+ public void processPost(HttpResponse response) {
+
+ try {
+ HTTPTransportUtils.processHTTPPostRequest(
+ msgContext,
+ request.getInputStream(),
+ response.getOutputStream(),
+ request.getHeader(Constants.CONTENT_TYPE),
+ request.getHeader(SOAPACTION),
+ request.getPath());
+ } catch (AxisFault e) {
+ handleException("Error processing POST request ", e, response);
+ }
+ }
+
+ private void processAxisGet(HttpResponse response, Map parameters) {
+ try {
+ // deal with GET request
+ boolean processed = HTTPTransportUtils.processHTTPGetRequest(
+ msgContext,
+ response.getOutputStream(),
+ request.getHeader(SOAPACTION),
+ request.getPath(),
+ cfgCtx,
+ parameters);
+
+ if (!processed) {
+ OutputStreamWriter out = new OutputStreamWriter(
+ response.getOutputStream());
+ try {
+ out.write(HTTPTransportReceiver.getServicesHTML(cfgCtx));
+ out.close();
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_HTML);
+ response.setStatus(ResponseStatus.OK);
+
+ } catch (IOException e) {
+ handleException("Error writing ? output to client", e, response);
+ }
+ }
+ } catch (AxisFault e) {
+ handleException("Axis fault while serving GET request", e, response);
+ }
+ }
+
+ private void handleException(String msg, Exception e, HttpResponse response) {
+ log.error(msg, e);
+
+ try {
+ AxisEngine engine = new AxisEngine(cfgCtx);
+ msgContext.setProperty(MessageContext.TRANSPORT_OUT, response.getOutputStream());
+ msgContext.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO, response.getOutputStream());
+ MessageContext faultContext = engine.createFaultMessageContext(msgContext, e);
+ engine.sendFault(faultContext);
+
+ } catch (Exception ex) {
+ response.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_PLAIN);
+ OutputStreamWriter out = new OutputStreamWriter(
+ response.getOutputStream());
+ try {
+ out.write(ex.getMessage());
+ out.close();
+ } catch (IOException ee) {
+ }
+
+ } finally {
+ response.setStatus(ResponseStatus.INTERNAL_SERVER_ERROR);
+ response.commit();
+ }
+ }
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Callback.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,21 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public interface Callback extends Runnable {
+
+ public void setResponse(HttpResponse r);
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Constants.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,55 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+/**
+ * TODO clean up!!!
+ */
+public final class Constants {
+
+ public static final int DEFAULT_CONNECTION_LINGER = -1;
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
+ public static final int DEFAULT_CONNECTION_UPLOAD_TIMEOUT = 300000;
+ public static final int DEFAULT_SERVER_SOCKET_TIMEOUT = 0;
+ public static final boolean DEFAULT_TCP_NO_DELAY = true;
+
+ public static final String CRLF = "\r\n";
+ public static final byte CR = (byte) '\r';
+ public static final byte LF = (byte) '\n';
+ public static final byte SP = (byte) ' ';
+ public static final String STRING_SP = " ";
+ public static final byte HT = (byte) '\t';
+ public static final byte COLON = (byte) ':';
+ public static final String STRING_COLON = ":";
+
+ public static final String HTTP_10 = "HTTP/1.0";
+ public static final String HTTP_11 = "HTTP/1.1";
+
+ public static final String GET = "GET";
+ public static final String POST = "POST";
+
+ public static final String TRANSFER_ENCODING = "Transfer-Encoding";
+ public static final String CONTENT_LENGTH = "Content-Length";
+ public static final String CONTENT_TYPE = "Content-Type";
+ public static final String TEXT_PLAIN = "text/plain";
+ public static final String TEXT_HTML = "text/html";
+ public static final String TEXT_XML = "text/xml";
+ public static final String CHUNKED = "chunked";
+ public static final String CONNECTION = "Connection";
+ public static final String CLOSE = "close";
+ public static final String OK = "OK";
+ public static final String LOCATION = "Location";
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,243 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.BufferOverflowException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * This represents an abstract HttpMessage - which may be a HttpRequest or a
+ * HttpResponse. This class defines the headers and the ByteBuffer that holds
+ * the message body in memory, along with an optional starting position within
+ * the buffer.
+ */
+public abstract class HttpMessage {
+
+ private static final Log log = LogFactory.getLog(HttpMessage.class);
+
+ /**
+ * http headers of this message
+ */
+ protected Map headers = new HashMap();
+ /**
+ * holder of the body content of this message
+ */
+ protected ByteBuffer buffer = ByteBuffer.allocate(4096);
+ /**
+ * position at the main buffer where the body starts (e.g. for requests)
+ */
+ protected int bodyStart;
+
+ /**
+ * A flag to detect if the getOutputStream() caller did not properly close() the stream
+ */
+ protected boolean outputStreamOpened = false;
+
+ public String getHeader(String name) {
+ return (String) headers.get(name);
+ }
+
+ public Map getHeaders() {
+ return headers;
+ }
+
+ public void addHeader(String name, String value) {
+ headers.put(name, value);
+ }
+
+ public boolean isChunked() {
+ return Constants.CHUNKED.equals(headers.get(Constants.TRANSFER_ENCODING));
+ }
+
+ /**
+ * Does this message specify a connection-close?
+ * @return true if connection should be closed
+ */
+ public boolean isConnectionClose() {
+ return Constants.CLOSE.equals(headers.get(Constants.CONNECTION));
+ }
+
+ public void setConnectionClose() {
+ headers.put(Constants.CONNECTION, Constants.CLOSE);
+ }
+
+ /**
+ * Return the legth of the content
+ * @return content length
+ */
+ public int getContentLength() {
+ String len = (String) headers.get(Constants.CONTENT_LENGTH);
+ if (len != null) {
+ return Integer.parseInt(len);
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Return an InputStream to read the body from the main ByteBuffer of this message
+ *
+ * @return an InputStream into the main ByteBuffer starting at the body
+ */
+ public InputStream getInputStream() {
+ // position to the start of the body
+ buffer.position(bodyStart);
+
+ // Returns an input stream for a ByteBuffer.
+ // The read() methods use the relative ByteBuffer get() methods.
+ return new InputStream() {
+ public synchronized int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get();
+ }
+
+ public synchronized int read(byte[] bytes, int off, int len) throws IOException {
+ // Read only what's left
+ len = Math.min(len, buffer.remaining());
+ buffer.get(bytes, off, len);
+ return len;
+ }
+ };
+ }
+
+ /**
+ * Get an OutputStream to write the body of this httpMessage
+ * @return an OutputStream to write the body
+ */
+ public OutputStream getOutputStream() {
+ // position for body
+ buffer.clear();
+ outputStreamOpened = true;
+ buffer.position(bodyStart);
+
+ // Returns an output stream for a ByteBuffer.
+ // The write() methods use the relative ByteBuffer put() methods.
+ return new OutputStream() {
+ public synchronized void write(int b) throws IOException {
+ while (true) {
+ try {
+ buffer.put((byte) b);
+ return;
+ } catch (BufferOverflowException bo) {
+ expandBuffer();
+ }
+ }
+ }
+
+ public synchronized void write(byte[] bytes, int off, int len) throws IOException {
+ while (true) {
+ try {
+ buffer.put(bytes, off, len);
+ return;
+ } catch (BufferOverflowException bo) {
+ expandBuffer();
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ buffer.flip();
+ outputStreamOpened = false;
+ }
+ };
+ }
+
+ /**
+ * Expand (double) the main ByteBuffer of this message
+ */
+ private void expandBuffer() {
+ ByteBuffer newBuf = ByteBuffer.allocate(buffer.capacity() * 2);
+ log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
+ buffer.flip();
+ buffer = newBuf.put(buffer);
+ }
+
+ /**
+ * Set the given buffer and the start position within that buffer as the
+ * body of this httpMessage
+ *
+ * @param buffer an externally allocated [and populated] buffer containing the message body
+ * @param bodyStart the start position of the actual body content within the buffer (default 0)
+ */
+ public void setBuffer(ByteBuffer buffer, int bodyStart) {
+ log.debug("HttpMessage.setBuffer() - buffer : " + buffer + " bodyStart: " + bodyStart);
+ this.buffer = buffer;
+ this.bodyStart = bodyStart;
+ }
+
+ /**
+ * Return a string representation of the message in HTTP wire-format
+ * @return a String representation of the message in HTTP wire-format
+ */
+ public String toString() {
+
+ StringBuffer sb = new StringBuffer();
+
+ // print httpMessage-line or status-line
+ sb.append(toStringLine());
+
+ // print headers
+ Iterator iter = headers.keySet().iterator();
+ while (iter.hasNext()) {
+ String headerName = (String) iter.next();
+ sb.append(headerName + Constants.STRING_COLON +
+ headers.get(headerName) + Constants.CRLF);
+ }
+ sb.append(Constants.CRLF);
+
+ if (buffer.limit() > 0) {
+ ByteBuffer bodyBuf = buffer;
+ if (bodyStart > 0) {
+ buffer.position(bodyStart);
+ bodyBuf = buffer.slice();
+ }
+
+ Charset set = Charset.forName("us-ascii");
+ CharsetDecoder dec = set.newDecoder();
+ try {
+ sb.append(dec.decode(bodyBuf));
+ } catch (CharacterCodingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ sb.append(Constants.CRLF);
+
+ return sb.toString();
+ }
+
+ /**
+ * Return the first line of text for the toString() representation of this object. This would
+ * be a httpMessage-line or a status-line as per the RFC
+ *
+ * @return the first line of text for the toString()
+ */
+ public abstract String toStringLine();
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,185 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+
+/**
+ * Represents an HttpRequest message.
+ */
+public class HttpRequest extends HttpMessage {
+
+ private static final Log log = LogFactory.getLog(HttpRequest.class);
+
+ private IncomingHandler handler;
+
+ private String host;
+ private int port;
+ private String path;
+ private String method;
+ private String protocol;
+ private Map requestParams;
+
+ /** A pointer to a runnable to be executed once a response is received for this httpMessage */
+ private Runnable onResponse = null;
+
+ public HttpRequest(URL url) {
+ this.host = url.getHost();
+ this.port = url.getPort();
+ setPath(url.getPath()); // populate requestParams as well...
+ this.method = Constants.POST;
+ this.protocol = Constants.HTTP_11;
+ setConnectionClose(); // use connection-close for outgoing requests by default
+ }
+
+ /**
+ * Get the Runnable code block that should be executed upon receiving a response to
+ * this HttpRequest
+ * @return a Runnable code block to execute on receipt of a response to this httpMessage
+ */
+ public Runnable getOnResponse() {
+ return onResponse;
+ }
+
+ /**
+ * Set the response handler Runnable code block
+ * @param onResponse the Runnable to be executed on receipt of a response to the httpMessage
+ */
+ public void setOnResponse(Runnable onResponse) {
+ this.onResponse = onResponse;
+ }
+
+ /**
+ * Set the incoming message handler
+ * @param handler the incoming message handler
+ */
+ public void setHandler(IncomingHandler handler) {
+ this.handler = handler;
+ }
+
+ public IncomingHandler getHandler() {
+ return handler;
+ }
+
+ //TODO
+ public HttpRequest() {
+ }
+
+ // TODO
+ public HttpResponse createResponse() {
+ return new HttpResponse(this);
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ requestParams = new HashMap();
+ int qPos = path.indexOf("?");
+ if (qPos != -1) {
+ String params = path.substring(qPos+1);
+ StringTokenizer st = new StringTokenizer(params, "&");
+ while (st.hasMoreTokens()) {
+ String kvPair = st.nextToken();
+ int eqPos = kvPair.indexOf("=");
+ if (eqPos != -1) {
+ requestParams.put(kvPair.substring(0, eqPos), kvPair.substring(eqPos));
+ } else {
+ requestParams.put(kvPair, null);
+ }
+ }
+ }
+ }
+
+ public Iterator getParameterNames() {
+ return requestParams.keySet().iterator();
+ }
+
+ public String getParameter(String name) {
+ return (String) requestParams.get(name);
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String toStringLine() {
+ return method + Constants.STRING_SP + path +
+ Constants.STRING_SP + protocol + Constants.CRLF;
+ }
+
+ /**
+ * Return a ByteBuffer representation of this message in HTTP wire-format
+ * @return the ByteBuffer representation of this message
+ */
+ public ByteBuffer getBuffer() {
+ return ByteBuffer.wrap(toString().getBytes());
+ }
+
+ /**
+ * Does this message specify a connection-close?
+ * @return true if connection should be closed
+ */
+ public boolean isConnectionClose() {
+ String connection = (String) headers.get(Constants.CONNECTION);
+ if (connection == null && Constants.HTTP_10.equals(protocol)) {
+ return true;
+ } else {
+ return Constants.CLOSE.equals(headers.get(Constants.CONNECTION));
+ }
+ }
+
+ //------------------------------ TESTING CODE ------------------------
+ /**
+ * Convenience method for testing etc
+ * @param body
+ */
+ public void setBody(String body) {
+ buffer.position(bodyStart);
+ buffer.put(body.getBytes());
+ buffer.flip();
+ }
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,115 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Represents an HttpResponse to a httpMessage received or sent.
+ */
+public class HttpResponse extends HttpMessage {
+
+ private static final Log log = LogFactory.getLog(HttpResponse.class);
+
+ /** The associated HttpRequest (if any) for this response */
+ private HttpRequest request;
+ /** The http version used */
+ private String version = null;
+ /** The response status - HTTP response */
+ private ResponseStatus status = new ResponseStatus();
+
+ /**
+ * Create a HttpResponse for the given HttpRequest. Sets connection-close
+ * if the httpMessage specified it.
+ * @param request the HttpRequest for which this reponse will apply
+ */
+ public HttpResponse(HttpRequest request) {
+ this.request = request;
+ this.version = Constants.HTTP_11;
+ if (request.isConnectionClose()) {
+ addHeader(Constants.CONNECTION, Constants.CLOSE);
+ }
+ buffer.flip();
+ }
+
+ /**
+ * Create a new HttpResponse
+ */
+ public HttpResponse() {}
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setResponseCode(int code) {
+ status.setCode(code);
+ }
+
+ public void setResponseMessage(String msg) {
+ status.setMessage(msg);
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ /**
+ * Perform actual commit of this response out to the wire. Calls on the message
+ * handler to process the message as required
+ */
+ public void commit() {
+ if (outputStreamOpened) {
+ // if someone didnt properly close the OutputStream after writing, flip buffer
+ buffer.flip();
+ outputStreamOpened = false;
+ }
+ if (request != null) {
+ request.getHandler().setResponse(this);
+ }
+ }
+
+ public void setStatus(ResponseStatus status) {
+ this.status = status;
+ }
+
+ public void setStatus(ResponseStatus status, String msg) {
+ this.status = status;
+ status.setMessage(msg);
+ }
+
+ public ResponseStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * Return a ByteBuffer representation of this message in HTTP wire-format for transmission
+ * @return the ByteBuffer representation of this message
+ */
+ public ByteBuffer getBuffer() {
+ if (buffer.limit() > 0) {
+ headers.put(Constants.CONTENT_LENGTH, Integer.toString(buffer.limit()));
+ }
+ return ByteBuffer.wrap(toString().getBytes());
+ }
+
+ public String toStringLine() {
+ return version + Constants.STRING_SP + status + Constants.CRLF;
+ }
+
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpService.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,23 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+public interface HttpService {
+
+ public void handleRequest(HttpRequest request);
+
+ public void handleResponse(HttpResponse response, Runnable callback);
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,114 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This is a generic handler, which will delegate events to a httpMessage/response
+ * specific handler as necessary. An instance of this class is expected to be
+ * dedicated to a single HTTP connection
+ *
+ * An IncomingHandler created for each accepted connection would be responsible for
+ * reading from the newly opened channel, and subsequently firing events when a
+ * message has been read and ready for processing. If the httpMessage handling logic then
+ * sets a response to this handler (or the corresponding HttpResponse commits itself)
+ * this handler will own (and delegate) the sending of that response back.
+ */
+public class IncomingHandler implements Runnable {
+
+ private static final Log log = LogFactory.getLog(IncomingHandler.class);
+
+ private SelectionKey sk;
+ private SocketChannel socket;
+
+ private ReadHandler incomingHandler = new ReadHandler(true);
+ private WriteHandler responseHandler = new WriteHandler();
+
+ private HttpService httpService;
+
+ public IncomingHandler(SocketChannel socket, Selector selector,
+ HttpService httpService) throws IOException {
+
+ this.httpService = httpService;
+ this.socket = socket;
+ socket.configureBlocking(false);
+ // Optionally try first read now
+ sk = socket.register(selector, 0);
+ sk.attach(this);
+ sk.interestOps(SelectionKey.OP_READ); // we are only interested to read
+ selector.wakeup();
+ }
+
+ public void setResponse(HttpResponse response) {
+ responseHandler.setMessage(response.getBuffer(), response.isConnectionClose());
+ sk.interestOps(SelectionKey.OP_WRITE);
+ sk.selector().wakeup();
+ log.debug("\tIncomingHandler.setResponse()");
+ }
+
+ /**
+ * The main handler routine
+ */
+ public void run() {
+
+ if (sk.isReadable()) {
+ log.debug("\tIncomingHandler run() - READABLE");
+ if (incomingHandler.handle(socket, sk)) {
+ log.debug("\tA httpMessage has been read completely");
+ // if httpMessage processing is complete
+ HttpRequest request = (HttpRequest) incomingHandler.getHttpMessage();
+ request.setHandler(this);
+ log.debug("\tFire event for received httpMessage");
+ httpService.handleRequest(request);
+
+ // if pipelining is used
+ if (!incomingHandler.isConnectionClose()) {
+ // prepare to read another httpMessage - reset and reuse
+ incomingHandler.reset();
+ log.debug("\tReset read handler to read next pipelined httpMessage");
+ }
+ }
+ } else if (sk.isWritable()) {
+ log.debug("\tIncomingHandler run() - WRITEABLE");
+ if (responseHandler.handle(socket)) {
+ log.debug("\tThe response has been written completely");
+ // response has been written completely
+ if (responseHandler.isConnectionClose()) {
+ log.debug("\tClosing connection normally");
+ sk.cancel();
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warn("Error during socket close : " + e.getMessage(), e);
+ }
+ } else {
+ // now we are again interested to read
+ sk.interestOps(SelectionKey.OP_READ);
+ }
+ }
+ } else {
+ log.warn("IncomingHandler run(!!!unknown event!!!) : " + sk.readyOps());
+ }
+ }
+
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,95 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+/**
+ * This handler owns sending of a httpMessage to an external endpoint using a WriteHandler
+ * and reading back the response. This does not handle persistent/pipelined connections
+ */
+public class OutgoingHandler implements Runnable {
+
+ private static final Log log = LogFactory.getLog(OutgoingHandler.class);
+
+ private SelectionKey sk;
+ private SocketChannel socket;
+ private HttpService httpService;
+
+ private WriteHandler writeHandler = new WriteHandler();
+ private ReadHandler readHandler = new ReadHandler(false); /* no response from this TODO*/
+ private Runnable callback = null;
+
+ OutgoingHandler(SocketChannel socket, SelectionKey sk, HttpRequest request, HttpService httpService) {
+ this.httpService = httpService;
+ this.socket = socket;
+ this.sk = sk;
+ writeHandler.setMessage(request.getBuffer(), true /* connection close */);
+ }
+
+ public Runnable getCallback() {
+ return callback;
+ }
+
+ public void setCallback(Runnable callback) {
+ this.callback = callback;
+ }
+
+ public void run() {
+ try {
+ if (sk.isConnectable() && socket.finishConnect()) {
+ log.debug("\tIncomingHandler run() - CONNECTABLE and CONNECTED");
+ sk.interestOps(SelectionKey.OP_WRITE);
+
+ } else if (sk.isWritable()) {
+ log.debug("\tIncomingHandler run() - WRITEABLE");
+ if (writeHandler.handle(socket)) {
+ log.debug("\tRequest written completely");
+ // response has been written completely
+ // now read response or at least result code
+ sk.interestOps(SelectionKey.OP_READ);
+ }
+
+ } else if (sk.isReadable()) {
+ log.debug("\tIncomingHandler run() - READABLE");
+ if (readHandler.handle(socket, sk)) {
+ log.debug("\tResponse read completely");
+ // if httpMessage processing is complete
+ log.debug("\tFire event for response read");
+ httpService.handleResponse((HttpResponse) readHandler.getHttpMessage(), callback);
+
+ // if pipelining is used
+ /*if (!readHandler.isConnectionClose()) {
+ // prepare to read another httpMessage
+ readHandler.reset(this);
+ log.debug("\thandler reset");
+ }*/
+ socket.close();
+ sk.cancel();
+ log.debug("Socket closed and SelectionKey cancelled");
+ }
+ }
+ }
+ catch (IOException e) {
+ log.error("Error in OutGoingHandler : " + e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,200 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * dynamic buffer expansion on need - done
+ * TODO socket timeouts, 100 continue, 202 processing - asap
+ * TODO ompression/mtom & mime/SSL - sometime soon
+ * TODO http sessions - do we need?
+ * TODO ByteBuffer pools and reuse of buffers, multiple selectors/IO threads - advanced
+ */
+public class Reactor implements Runnable {
+
+ private static final Log log = LogFactory.getLog(Reactor.class);
+
+ /**
+ * The [single] main selector
+ */
+ final Selector selector;
+ /**
+ * The server socket channel
+ */
+ final ServerSocketChannel serverSocketChannel;
+
+ /**
+ * The maximum number of milli secs a select call would block for
+ */
+ private int selectTimeout = 500;
+ /**
+ * variable to be set when a shutdown is required
+ */
+ private boolean shutdownRequested = false;
+ /**
+ * The HttpService on which events are fired upon when new messages are received
+ */
+ private HttpService httpService;
+
+ private static Reactor _httpReactor, _httpsReactor;
+
+ public static synchronized Reactor getInstance(boolean secure) {
+ if (secure) {
+ return _httpsReactor;
+ } else {
+ return _httpReactor;
+ }
+ }
+
+ public static synchronized Reactor createReactor(
+ String host, int port, boolean secure, HttpService httpService) throws IOException {
+ if (secure) {
+ if (_httpsReactor != null) {
+ _httpsReactor.setShutdownRequested(true);
+ }
+ _httpsReactor = new Reactor(host, port, secure, httpService);
+ return _httpsReactor;
+ } else {
+ if (_httpReactor != null) {
+ _httpReactor.setShutdownRequested(true);
+ }
+ _httpReactor = new Reactor(host, port, secure, httpService);
+ return _httpReactor;
+ }
+ }
+
+ /**
+ * Starts a new Reactor on the specified port and prepares to
+ * accept new connections
+ *
+ * @param port the server listen port
+ * @throws IOException
+ */
+ private Reactor(String host, int port, boolean secure, HttpService httpService) throws IOException {
+
+ this.httpService = httpService;
+ selector = Selector.open();
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.socket().bind(
+ new InetSocketAddress(port));
+ /*new InetSocketAddress(
+ host == null ? InetAddress.getLocalHost() : InetAddress.getByName(host), port));*/
+ serverSocketChannel.configureBlocking(false);
+
+ SelectionKey sk = serverSocketChannel.register(
+ selector, SelectionKey.OP_ACCEPT);
+ sk.attach(new Acceptor());
+ log.info("Reactor Created for host : " + host + " on port : " + port);
+ }
+
+ /**
+ * This is the main routine of the Reactor, which will
+ */
+ public void run() {
+ try {
+ while (!shutdownRequested) {
+ int keys = selector.select(selectTimeout);
+ if (keys > 0) {
+ Set selected = selector.selectedKeys();
+ Iterator it = selected.iterator();
+ while (it.hasNext())
+ dispatch((SelectionKey) (it.next()));
+ selected.clear();
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * The main dispatch routine. This currently does not delegate to a thread pool
+ * and leave this to the user
+ *
+ * @param k the selection key for the event
+ */
+ void dispatch(SelectionKey k) {
+ Runnable r = (Runnable) (k.attachment());
+ if (r != null)
+ r.run();
+ }
+
+ /**
+ * Accepts a new connection and hands it off to a new IncomingHandler instance
+ */
+ class Acceptor implements Runnable {
+
+ public void run() {
+ SocketChannel socket;
+ try {
+ socket = serverSocketChannel.accept();
+ if (socket != null) {
+ log.info("Accepting new connection...");
+ // we have a *new* HTTP connection here
+ new IncomingHandler(socket, selector, httpService);
+ }
+ } catch (IOException e) {
+ handleException("Exception while accepting a connection : " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void send(HttpRequest request, Runnable callback) {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(
+ request.getHost(), request.getPort());
+
+ SocketChannel socket = SocketChannel.open();
+ socket.configureBlocking(false);
+ socket.connect(addr);
+
+ SelectionKey sk = socket.register(selector, SelectionKey.OP_CONNECT);
+ OutgoingHandler outHandler = new OutgoingHandler(socket, sk, request, httpService);
+ if (callback != null) {
+ outHandler.setCallback(callback);
+ }
+ sk.attach(outHandler);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ e.printStackTrace(); // TODO
+ // throw new xxxx TODO
+ }
+
+ public void setShutdownRequested(boolean shutdownRequested) {
+ this.shutdownRequested = shutdownRequested;
+ }
+}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=auto&rev=462889
==============================================================================
--- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java (added)
+++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java Wed Oct 11 11:02:08 2006
@@ -0,0 +1,92 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl;
+
+import java.net.URL;
+import java.io.IOException;
+
+public class ReactorTester {
+
+ private Reactor r = null;
+
+ public static void main(String[] args) throws Exception {
+ ReactorTester rt = new ReactorTester();
+ rt.runDemo();
+ }
+
+ private void runDemo() throws IOException {
+
+ r = Reactor.createReactor(null, 9001, false,
+ new HttpService() {
+
+ public void handleRequest(HttpRequest request) {
+ try {
+ System.out.println("Processing Request : " + request);
+ // create new HttpRequest
+ HttpRequest forwardReq = new HttpRequest(
+ new URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+
+ Util.copyStreams(request.getInputStream(), forwardReq.getOutputStream());
+ SimpleCallback cb = new SimpleCallback(request);
+ r.send(forwardReq, cb);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void handleResponse(HttpResponse response, Runnable callback) {
+ System.out.println("Received Response : " + response);
+ SimpleCallback cb = (SimpleCallback) callback;
+ cb.setResponse(response);
+ cb.run();
+ }
+ });
+
+ new Thread(r).start();
+ }
+
+
+ public class SimpleCallback implements Callback {
+ HttpResponse response;
+ HttpRequest request;
+
+ SimpleCallback(HttpRequest request) {
+ this.request = request;
+ }
+
+ public void setResponse(HttpResponse response) {
+ this.response = response;
+ }
+
+ public void setRequest(HttpRequest request) {
+ this.request = request;
+ }
+
+ public void run() {
+ HttpResponse newResponse = request.createResponse();
+ try {
+ Util.copyStreams(response.getInputStream(), newResponse.getOutputStream());
+ newResponse.setStatus(ResponseStatus.OK);
+ newResponse.addHeader(Constants.CONTENT_TYPE, Constants.TEXT_XML);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ newResponse.commit();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org