You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/03/07 13:32:42 UTC

svn commit: r515552 - in /webservices/synapse/trunk/java/modules: core/src/main/java/org/apache/synapse/core/axis2/ nhttp/src/org/apache/axis2/transport/nhttp/

Author: asankha
Date: Wed Mar  7 04:32:41 2007
New Revision: 515552

URL: http://svn.apache.org/viewvc?view=rev&rev=515552
Log:
Allow Synapse to get notified of connection and send errors for load balancing etc from the non blocking transport

Modified:
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
    webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java Wed Mar  7 04:32:41 2007
@@ -21,87 +21,29 @@
 
 import org.apache.axis2.client.async.Callback;
 import org.apache.axis2.client.async.AsyncResult;
-import org.apache.axis2.context.MessageContext;
-import org.apache.synapse.Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Iterator;
+import org.apache.synapse.MessageContext;
 
+/**
+ * This class only "holds" the Synapse out message context for the Synapse callback message
+ * receiver when a response is received or error is encountered
+ */
 public class AsyncCallback extends Callback {
 
-    private static final Log log = LogFactory.getLog(AsyncCallback.class);
-
-    org.apache.synapse.MessageContext synapseOutMsgCtx = null;
+    MessageContext synapseOutMsgCtx = null;
 
     public AsyncCallback(org.apache.synapse.MessageContext synapseOutMsgCtx) {
         this.synapseOutMsgCtx = synapseOutMsgCtx;
     }
 
-    public void onComplete(AsyncResult result) {
-
-        MessageContext response = result.getResponseMessageContext();
+    public void onComplete(AsyncResult result) {}
 
-        log.debug("Synapse received an asynchronous response message");
-        log.debug("Received To: " +
-            (response.getTo() != null ? response.getTo().getAddress() : "null"));
-        log.debug("SOAPAction: " +
-            (response.getSoapAction() != null ? response.getSoapAction() : "null"));
-        if (log.isDebugEnabled()) {
-            log.debug("Body : \n" + response.getEnvelope());
-        }
-
-        MessageContext axisOutMsgCtx =
-            ((Axis2MessageContext)synapseOutMsgCtx).getAxis2MessageContext();
-
-        response.setOperationContext(axisOutMsgCtx.getOperationContext());
-        response.setAxisService(axisOutMsgCtx.getAxisService());
-
-        // set properties on response
-        response.setServerSide(true);
-        response.setProperty(Constants.ISRESPONSE_PROPERTY, Boolean.TRUE);
-        response.setProperty(MessageContext.TRANSPORT_OUT,
-            axisOutMsgCtx.getProperty(MessageContext.TRANSPORT_OUT));
-        response.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO,
-            axisOutMsgCtx.getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO));
-        response.setTransportIn(axisOutMsgCtx.getTransportIn());
-        response.setTransportOut(axisOutMsgCtx.getTransportOut());
-
-        // If request is REST assume that the response is REST too
-        response.setDoingREST(axisOutMsgCtx.isDoingREST());
-
-        // create the synapse message context for the response
-        Axis2MessageContext synapseInMessageContext =
-            new Axis2MessageContext(
-                response,
-                synapseOutMsgCtx.getConfiguration(),
-                synapseOutMsgCtx.getEnvironment());
-
-        synapseInMessageContext.setResponse(true);
-        synapseInMessageContext.setTo(null);
-
-        // set the properties of the original MC to the new MC
-        Iterator iter = synapseOutMsgCtx.getLocalPropertyKeySet().iterator();
-
-        while (iter.hasNext()) {
-            Object key = iter.next();
-            synapseInMessageContext.setProperty(
-                (String) key, synapseOutMsgCtx.getLocalProperty((String) key));
-        }
-
-        // send the response message through the synapse mediation flow
-        synapseOutMsgCtx.getEnvironment().
-            injectMessage(synapseInMessageContext);
-    }
+    public void onError(Exception e) {}
 
-    public void onError(Exception e) {
-        // this will never be called as our custom SynapseCallbackReceiver will push
-        // faults as well through the onComplete()
-        log.warn(e);
-        e.printStackTrace();
+    public void setSynapseOutMshCtx(MessageContext synapseOutMsgCtx) {
+        this.synapseOutMsgCtx = synapseOutMsgCtx;
     }
 
-    public void setSynapseOutMshCtx(org.apache.synapse.MessageContext synapseOutMsgCtx) {
-        this.synapseOutMsgCtx = synapseOutMsgCtx;
+    public MessageContext getSynapseOutMsgCtx() {
+        return synapseOutMsgCtx;
     }
 }

Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java Wed Mar  7 04:32:41 2007
@@ -21,18 +21,16 @@
 
 import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.client.async.Callback;
-import org.apache.axis2.client.async.AsyncResult;
 import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.AxisFault;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.SynapseException;
-import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Constants;
 
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Collections;
+import java.util.Iterator;
 
 public class SynapseCallbackReceiver implements MessageReceiver {
 
@@ -52,11 +50,10 @@
 
         if (messageCtx.getOptions() != null && messageCtx.getOptions().getRelatesTo() != null) {
             String messageID  = messageCtx.getOptions().getRelatesTo().getValue();
-            Callback callback = (Callback) callbackStore.get(messageID);
+            Callback callback = (Callback) callbackStore.remove(messageID);
 
             if (callback != null) {
-                callbackStore.remove(messageID);
-                callback.onComplete(new AsyncResult(messageCtx));
+                handleMessage(messageCtx, ((AsyncCallback) callback).getSynapseOutMsgCtx());
             } else {
                 // TODO invoke a generic synapse error handler for this message
                 log.warn("Synapse received a response for the request with message Id : " +
@@ -66,6 +63,77 @@
         } else {
             // TODO invoke a generic synapse error handler for this message
             log.warn("Synapse received a response message without a message Id");
+        }
+    }
+
+    /**
+     * Handle the response or error (during a failed send) message received for an outgoing request
+     *
+     * @param response the Axis2 MessageContext that has been received and has to be handled
+     * @param synapseOutMsgCtx the corresponding (outgoing) Synapse MessageContext for the above
+     * Axis2 MC, that holds Synapse specific information such as the error handler stack and
+     * local properties etc.
+     */
+    private void handleMessage(MessageContext response,
+        org.apache.synapse.MessageContext synapseOutMsgCtx) {
+
+        if (response.getEnvelope().getBody().hasFault()) {
+            // synapseOutMsgCtx.getFaultStack().pop(); and handle the
+            // response.getEnvelope().getBody().getFault().getException()
+            // TODO chathura
+
+        } else {
+
+            if (log.isDebugEnabled()) {
+                log.debug("Synapse received an asynchronous response message");
+                log.debug("Received To: " +
+                    (response.getTo() != null ? response.getTo().getAddress() : "null"));
+                log.debug("SOAPAction: " +
+                    (response.getSoapAction() != null ? response.getSoapAction() : "null"));
+                log.debug("Body : \n" + response.getEnvelope());
+            }
+
+            MessageContext axisOutMsgCtx =
+                ((Axis2MessageContext)synapseOutMsgCtx).getAxis2MessageContext();
+
+            response.setOperationContext(axisOutMsgCtx.getOperationContext());
+            response.setAxisService(axisOutMsgCtx.getAxisService());
+
+            // set properties on response
+            response.setServerSide(true);
+            response.setProperty(Constants.ISRESPONSE_PROPERTY, Boolean.TRUE);
+            response.setProperty(MessageContext.TRANSPORT_OUT,
+                axisOutMsgCtx.getProperty(MessageContext.TRANSPORT_OUT));
+            response.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO,
+                axisOutMsgCtx.getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO));
+            response.setTransportIn(axisOutMsgCtx.getTransportIn());
+            response.setTransportOut(axisOutMsgCtx.getTransportOut());
+
+            // If request is REST assume that the response is REST too
+            response.setDoingREST(axisOutMsgCtx.isDoingREST());
+
+            // create the synapse message context for the response
+            Axis2MessageContext synapseInMessageContext =
+                new Axis2MessageContext(
+                    response,
+                    synapseOutMsgCtx.getConfiguration(),
+                    synapseOutMsgCtx.getEnvironment());
+
+            synapseInMessageContext.setResponse(true);
+            synapseInMessageContext.setTo(null);
+
+            // set the properties of the original MC to the new MC
+            Iterator iter = synapseOutMsgCtx.getLocalPropertyKeySet().iterator();
+
+            while (iter.hasNext()) {
+                Object key = iter.next();
+                synapseInMessageContext.setProperty(
+                    (String) key, synapseOutMsgCtx.getLocalProperty((String) key));
+            }
+
+            // send the response message through the synapse mediation flow
+            synapseOutMsgCtx.getEnvironment().
+                injectMessage(synapseInMessageContext);
         }
     }
 }

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java Wed Mar  7 04:32:41 2007
@@ -17,8 +17,11 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportOutDescription;
@@ -32,6 +35,7 @@
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.SessionRequest;
+import org.apache.http.nio.reactor.SessionRequestCallback;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpHost;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -68,6 +72,8 @@
     private ConnectingIOReactor ioReactor = null;
     /** The client handler */
     private NHttpClientHandler handler = null;
+    /** The session request callback that calls back to the message receiver with errors */
+    private final SessionRequestCallback sessionRequestCallback = getSessionRequestCallback();
     /** The SSL Context to be used */
     SSLContext sslContext = null;
 
@@ -240,6 +246,7 @@
             if (conn == null) {
                 SessionRequest req = ioReactor.connect(
                     new InetSocketAddress(url.getHost(), port), null, axis2Req);
+                req.setCallback(sessionRequestCallback);
                 log.debug("A new connection established");
             } else {
                 ((ClientHandler) handler).submitRequest(conn, axis2Req);
@@ -340,6 +347,47 @@
         } catch (IOException e) {
             log.warn("Error shutting down IOReactor", e);
         }
+    }
+
+    /**
+     * Return a SessionRequestCallback which gets notified of a connection failure
+     * or an error during a send operation. This method finds the corresponding
+     * Axis2 message context for the outgoing request, and find the message receiver
+     * and sends a fault message back to the message receiver that is marked as
+     * related to the outgoing request
+     * @return a Session request callback
+     */
+    private static SessionRequestCallback getSessionRequestCallback() {
+        return new SessionRequestCallback() {
+            public void completed(SessionRequest request) {
+            }
+
+            public void failed(SessionRequest request) {
+                handleError(request);
+            }
+
+            public void timeout(SessionRequest request) {
+                handleError(request);
+            }
+
+            private void handleError(SessionRequest request) {
+                if (request.getAttachment() != null &&
+                    request.getAttachment() instanceof Axis2HttpRequest) {
+
+                    Axis2HttpRequest axis2Request = (Axis2HttpRequest) request.getAttachment();
+                    MessageContext mc = axis2Request.getMsgContext();
+                    MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+
+                    try {
+                        mr.receive(
+                            new AxisEngine(mc.getConfigurationContext()).
+                                createFaultMessageContext(mc, request.getException()));
+                    } catch (AxisFault af) {
+                        log.error("Unable to report back failure to the message receiver", af);
+                    }
+                }
+            }
+        };
     }
 
     // -------------- utility methods -------------



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org