You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/03/07 13:32:42 UTC
svn commit: r515552 - in /webservices/synapse/trunk/java/modules:
core/src/main/java/org/apache/synapse/core/axis2/
nhttp/src/org/apache/axis2/transport/nhttp/
Author: asankha
Date: Wed Mar 7 04:32:41 2007
New Revision: 515552
URL: http://svn.apache.org/viewvc?view=rev&rev=515552
Log:
Allow Synapse to get notified of connection and send errors for load balancing etc from the non blocking transport
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AsyncCallback.java Wed Mar 7 04:32:41 2007
@@ -21,87 +21,29 @@
import org.apache.axis2.client.async.Callback;
import org.apache.axis2.client.async.AsyncResult;
-import org.apache.axis2.context.MessageContext;
-import org.apache.synapse.Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Iterator;
+import org.apache.synapse.MessageContext;
+/**
+ * This class only "holds" the Synapse out message context for the Synapse callback message
+ * receiver when a response is received or error is encountered
+ */
public class AsyncCallback extends Callback {
- private static final Log log = LogFactory.getLog(AsyncCallback.class);
-
- org.apache.synapse.MessageContext synapseOutMsgCtx = null;
+ MessageContext synapseOutMsgCtx = null;
public AsyncCallback(org.apache.synapse.MessageContext synapseOutMsgCtx) {
this.synapseOutMsgCtx = synapseOutMsgCtx;
}
- public void onComplete(AsyncResult result) {
-
- MessageContext response = result.getResponseMessageContext();
+ public void onComplete(AsyncResult result) {}
- log.debug("Synapse received an asynchronous response message");
- log.debug("Received To: " +
- (response.getTo() != null ? response.getTo().getAddress() : "null"));
- log.debug("SOAPAction: " +
- (response.getSoapAction() != null ? response.getSoapAction() : "null"));
- if (log.isDebugEnabled()) {
- log.debug("Body : \n" + response.getEnvelope());
- }
-
- MessageContext axisOutMsgCtx =
- ((Axis2MessageContext)synapseOutMsgCtx).getAxis2MessageContext();
-
- response.setOperationContext(axisOutMsgCtx.getOperationContext());
- response.setAxisService(axisOutMsgCtx.getAxisService());
-
- // set properties on response
- response.setServerSide(true);
- response.setProperty(Constants.ISRESPONSE_PROPERTY, Boolean.TRUE);
- response.setProperty(MessageContext.TRANSPORT_OUT,
- axisOutMsgCtx.getProperty(MessageContext.TRANSPORT_OUT));
- response.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO,
- axisOutMsgCtx.getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO));
- response.setTransportIn(axisOutMsgCtx.getTransportIn());
- response.setTransportOut(axisOutMsgCtx.getTransportOut());
-
- // If request is REST assume that the response is REST too
- response.setDoingREST(axisOutMsgCtx.isDoingREST());
-
- // create the synapse message context for the response
- Axis2MessageContext synapseInMessageContext =
- new Axis2MessageContext(
- response,
- synapseOutMsgCtx.getConfiguration(),
- synapseOutMsgCtx.getEnvironment());
-
- synapseInMessageContext.setResponse(true);
- synapseInMessageContext.setTo(null);
-
- // set the properties of the original MC to the new MC
- Iterator iter = synapseOutMsgCtx.getLocalPropertyKeySet().iterator();
-
- while (iter.hasNext()) {
- Object key = iter.next();
- synapseInMessageContext.setProperty(
- (String) key, synapseOutMsgCtx.getLocalProperty((String) key));
- }
-
- // send the response message through the synapse mediation flow
- synapseOutMsgCtx.getEnvironment().
- injectMessage(synapseInMessageContext);
- }
+ public void onError(Exception e) {}
- public void onError(Exception e) {
- // this will never be called as our custom SynapseCallbackReceiver will push
- // faults as well through the onComplete()
- log.warn(e);
- e.printStackTrace();
+ public void setSynapseOutMshCtx(MessageContext synapseOutMsgCtx) {
+ this.synapseOutMsgCtx = synapseOutMsgCtx;
}
- public void setSynapseOutMshCtx(org.apache.synapse.MessageContext synapseOutMsgCtx) {
- this.synapseOutMsgCtx = synapseOutMsgCtx;
+ public MessageContext getSynapseOutMsgCtx() {
+ return synapseOutMsgCtx;
}
}
Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java (original)
+++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java Wed Mar 7 04:32:41 2007
@@ -21,18 +21,16 @@
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.client.async.Callback;
-import org.apache.axis2.client.async.AsyncResult;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.AxisFault;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.SynapseException;
-import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Constants;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
+import java.util.Iterator;
public class SynapseCallbackReceiver implements MessageReceiver {
@@ -52,11 +50,10 @@
if (messageCtx.getOptions() != null && messageCtx.getOptions().getRelatesTo() != null) {
String messageID = messageCtx.getOptions().getRelatesTo().getValue();
- Callback callback = (Callback) callbackStore.get(messageID);
+ Callback callback = (Callback) callbackStore.remove(messageID);
if (callback != null) {
- callbackStore.remove(messageID);
- callback.onComplete(new AsyncResult(messageCtx));
+ handleMessage(messageCtx, ((AsyncCallback) callback).getSynapseOutMsgCtx());
} else {
// TODO invoke a generic synapse error handler for this message
log.warn("Synapse received a response for the request with message Id : " +
@@ -66,6 +63,77 @@
} else {
// TODO invoke a generic synapse error handler for this message
log.warn("Synapse received a response message without a message Id");
+ }
+ }
+
+ /**
+ * Handle the response or error (during a failed send) message received for an outgoing request
+ *
+ * @param response the Axis2 MessageContext that has been received and has to be handled
+ * @param synapseOutMsgCtx the corresponding (outgoing) Synapse MessageContext for the above
+ * Axis2 MC, that holds Synapse specific information such as the error handler stack and
+ * local properties etc.
+ */
+ private void handleMessage(MessageContext response,
+ org.apache.synapse.MessageContext synapseOutMsgCtx) {
+
+ if (response.getEnvelope().getBody().hasFault()) {
+ // synapseOutMsgCtx.getFaultStack().pop(); and handle the
+ // response.getEnvelope().getBody().getFault().getException()
+ // TODO chathura
+
+ } else {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Synapse received an asynchronous response message");
+ log.debug("Received To: " +
+ (response.getTo() != null ? response.getTo().getAddress() : "null"));
+ log.debug("SOAPAction: " +
+ (response.getSoapAction() != null ? response.getSoapAction() : "null"));
+ log.debug("Body : \n" + response.getEnvelope());
+ }
+
+ MessageContext axisOutMsgCtx =
+ ((Axis2MessageContext)synapseOutMsgCtx).getAxis2MessageContext();
+
+ response.setOperationContext(axisOutMsgCtx.getOperationContext());
+ response.setAxisService(axisOutMsgCtx.getAxisService());
+
+ // set properties on response
+ response.setServerSide(true);
+ response.setProperty(Constants.ISRESPONSE_PROPERTY, Boolean.TRUE);
+ response.setProperty(MessageContext.TRANSPORT_OUT,
+ axisOutMsgCtx.getProperty(MessageContext.TRANSPORT_OUT));
+ response.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO,
+ axisOutMsgCtx.getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO));
+ response.setTransportIn(axisOutMsgCtx.getTransportIn());
+ response.setTransportOut(axisOutMsgCtx.getTransportOut());
+
+ // If request is REST assume that the response is REST too
+ response.setDoingREST(axisOutMsgCtx.isDoingREST());
+
+ // create the synapse message context for the response
+ Axis2MessageContext synapseInMessageContext =
+ new Axis2MessageContext(
+ response,
+ synapseOutMsgCtx.getConfiguration(),
+ synapseOutMsgCtx.getEnvironment());
+
+ synapseInMessageContext.setResponse(true);
+ synapseInMessageContext.setTo(null);
+
+ // set the properties of the original MC to the new MC
+ Iterator iter = synapseOutMsgCtx.getLocalPropertyKeySet().iterator();
+
+ while (iter.hasNext()) {
+ Object key = iter.next();
+ synapseInMessageContext.setProperty(
+ (String) key, synapseOutMsgCtx.getLocalProperty((String) key));
+ }
+
+ // send the response message through the synapse mediation flow
+ synapseOutMsgCtx.getEnvironment().
+ injectMessage(synapseInMessageContext);
}
}
}
Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java?view=diff&rev=515552&r1=515551&r2=515552
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java Wed Mar 7 04:32:41 2007
@@ -17,8 +17,11 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportOutDescription;
@@ -32,6 +35,7 @@
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.SessionRequest;
+import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.HttpResponse;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -68,6 +72,8 @@
private ConnectingIOReactor ioReactor = null;
/** The client handler */
private NHttpClientHandler handler = null;
+ /** The session request callback that calls back to the message receiver with errors */
+ private final SessionRequestCallback sessionRequestCallback = getSessionRequestCallback();
/** The SSL Context to be used */
SSLContext sslContext = null;
@@ -240,6 +246,7 @@
if (conn == null) {
SessionRequest req = ioReactor.connect(
new InetSocketAddress(url.getHost(), port), null, axis2Req);
+ req.setCallback(sessionRequestCallback);
log.debug("A new connection established");
} else {
((ClientHandler) handler).submitRequest(conn, axis2Req);
@@ -340,6 +347,47 @@
} catch (IOException e) {
log.warn("Error shutting down IOReactor", e);
}
+ }
+
+ /**
+ * Return a SessionRequestCallback which gets notified of a connection failure
+ * or an error during a send operation. This method finds the corresponding
+ * Axis2 message context for the outgoing request, and find the message receiver
+ * and sends a fault message back to the message receiver that is marked as
+ * related to the outgoing request
+ * @return a Session request callback
+ */
+ private static SessionRequestCallback getSessionRequestCallback() {
+ return new SessionRequestCallback() {
+ public void completed(SessionRequest request) {
+ }
+
+ public void failed(SessionRequest request) {
+ handleError(request);
+ }
+
+ public void timeout(SessionRequest request) {
+ handleError(request);
+ }
+
+ private void handleError(SessionRequest request) {
+ if (request.getAttachment() != null &&
+ request.getAttachment() instanceof Axis2HttpRequest) {
+
+ Axis2HttpRequest axis2Request = (Axis2HttpRequest) request.getAttachment();
+ MessageContext mc = axis2Request.getMsgContext();
+ MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+
+ try {
+ mr.receive(
+ new AxisEngine(mc.getConfigurationContext()).
+ createFaultMessageContext(mc, request.getException()));
+ } catch (AxisFault af) {
+ log.error("Unable to report back failure to the message receiver", af);
+ }
+ }
+ }
+ };
}
// -------------- utility methods -------------
---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org