You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/02/22 19:20:55 UTC

svn commit: r510613 - /incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java

Author: mszefler
Date: Thu Feb 22 10:20:54 2007
New Revision: 510613

URL: http://svn.apache.org/viewvc?view=rev&rev=510613
Log:
Don't send mesage in JBI until after txcommit. 

Modified:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=510613&r1=510612&r2=510613
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Thu Feb 22 10:20:54 2007
@@ -21,6 +21,7 @@
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.jbi.msgmap.Mapper;
 import org.apache.ode.jbi.msgmap.MessageTranslationException;
@@ -37,208 +38,225 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Bridge between ODE (consumers) and JBI (providers). An single object of this
- * type handles all communications initiated by ODE that is destined for other
- * JBI providers.
+ * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE
+ * that is destined for other JBI providers.
  */
 class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
-  private static final Log __log = LogFactory.getLog(OdeConsumer.class);
+    private static final Log __log = LogFactory.getLog(OdeConsumer.class);
 
-  private OdeContext _ode;
+    private OdeContext _ode;
 
-  private Map<String, String> _outstandingExchanges = new ConcurrentHashMap<String, String>();
+    private Map<String, String> _outstandingExchanges = new ConcurrentHashMap<String, String>();
 
-  OdeConsumer(OdeContext ode) {
-    _ode = ode;
-  }
-
-  /**
-   * This is where we handle invocation where the ODE BPEL engine is the
-   * <em>client</em> and some other JBI service is the <em>provider</em>.
-   */
-  public void invokePartner(PartnerRoleMessageExchange odeMex)
-      throws ContextException {
-    // Cast the EndpointReference to a JbiEndpointReference. This is the
-    // only type it can be (since we control the creation of these things).
-    JbiEndpointReference targetEndpoint = (JbiEndpointReference) odeMex
-        .getEndpointReference();
-
-    if (targetEndpoint == null) {
-      String errmsg = "No endpoint for mex: " + odeMex;
-      __log.error(errmsg);
-      odeMex.replyWithFailure(FailureType.INVALID_ENDPOINT, errmsg, null);
-      return;
+    OdeConsumer(OdeContext ode) {
+        _ode = ode;
     }
 
-    ServiceEndpoint se = targetEndpoint.getServiceEndpoint();
+    /**
+     * This is where we handle invocation where the ODE BPEL engine is the <em>client</em> and some other JBI service is the
+     * <em>provider</em>.
+     */
+    public void invokePartner(final PartnerRoleMessageExchange odeMex) throws ContextException {
+        // Cast the EndpointReference to a JbiEndpointReference. This is the
+        // only type it can be (since we control the creation of these things).
+        JbiEndpointReference targetEndpoint = (JbiEndpointReference) odeMex.getEndpointReference();
 
-    boolean isTwoWay = odeMex.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+        if (targetEndpoint == null) {
+            String errmsg = "No endpoint for mex: " + odeMex;
+            __log.error(errmsg);
+            odeMex.replyWithFailure(FailureType.INVALID_ENDPOINT, errmsg, null);
+            return;
+        }
 
-    QName opname = new QName(se.getServiceName().getNamespaceURI(), odeMex
-        .getOperation().getName());
+        ServiceEndpoint se = targetEndpoint.getServiceEndpoint();
 
-    MessageExchangeFactory mexf = _ode.getChannel().createExchangeFactory(se);
-    MessageExchange jbiMex;
-    try {
-      jbiMex = mexf.createExchange(isTwoWay ? MessageExchangePattern.IN_OUT
-          : MessageExchangePattern.IN_ONLY);
-      jbiMex.setEndpoint(se);
-      jbiMex.setService(se.getServiceName());
-      jbiMex.setOperation(opname);
-    } catch (MessagingException e) {
-      String errmsg = "Unable to create JBI message exchange for ODE message exchange "
-          + odeMex;
-      __log.error(errmsg, e);
-      odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
-      return;
-    }
+        boolean isTwoWay = odeMex.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
 
-    Mapper mapper = _ode.getDefaultMapper();
-    odeMex.setProperty(Mapper.class.getName(),mapper.getClass().getName());
-    try {
-      if (!isTwoWay) {
-        InOnly inonly = ((InOnly) jbiMex);
-        NormalizedMessage nmsg = inonly.createMessage();
-        mapper.toNMS(nmsg,odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
-        inonly.setInMessage(nmsg);
-        _ode.getChannel().send(inonly);
-        odeMex.replyOneWayOk();
-      } else {
-        InOut inout = (InOut) jbiMex;
-        NormalizedMessage nmsg = inout.createMessage();
-        mapper.toNMS(nmsg,odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
-        inout.setInMessage(nmsg);
-        _ode.getChannel().send(inout);
-        _outstandingExchanges.put(inout.getExchangeId(), odeMex
-            .getMessageExchangeId());
-        odeMex.replyAsync();
-      }
-    } catch (MessagingException me) {
-      String errmsg = "Error sending message to JBI for ODE mex " + odeMex;
-      __log.error(errmsg, me);
-      odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
-    } catch (MessageTranslationException e) {
-      String errmsg = "Error converting ODE message to JBI format for mex "
-          + odeMex;
-      __log.error(errmsg, e);
-      odeMex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
-    }
+        QName opname = new QName(se.getServiceName().getNamespaceURI(), odeMex.getOperation().getName());
 
-  }
+        MessageExchangeFactory mexf = _ode.getChannel().createExchangeFactory(se);
+        MessageExchange jbiMex;
+        try {
+            jbiMex = mexf.createExchange(isTwoWay ? MessageExchangePattern.IN_OUT : MessageExchangePattern.IN_ONLY);
+            jbiMex.setEndpoint(se);
+            jbiMex.setService(se.getServiceName());
+            jbiMex.setOperation(opname);
+        } catch (MessagingException e) {
+            String errmsg = "Unable to create JBI message exchange for ODE message exchange " + odeMex;
+            __log.error(errmsg, e);
+            odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
+            return;
+        }
 
-  public void onJbiMessageExchange(MessageExchange jbiMex)
-      throws MessagingException {
-    if (jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)) {
-      // Ignore these, they're one way.
-    } else if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
-      if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
-        outResponse((InOut) jbiMex);
-        jbiMex.setStatus(ExchangeStatus.DONE);
-        _ode.getChannel().send(jbiMex);
-      } else if (jbiMex.getStatus() == ExchangeStatus.ERROR)
-        outFailure((InOut) jbiMex);
-      else
-        __log.warn("Unexpected state for JBI message exchange: "
-            + jbiMex.getExchangeId());
-    } else {
-      __log.fatal("JBI MessageExchange " + jbiMex.getExchangeId()
-          + " is of an unsupported pattern " + jbiMex.getPattern());
-    }
+        Mapper mapper = _ode.getDefaultMapper();
+        odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());
+        try {
+            if (!isTwoWay) {
+                final InOnly inonly = ((InOnly) jbiMex);
+                NormalizedMessage nmsg = inonly.createMessage();
+                mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
+                inonly.setInMessage(nmsg);
+                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+                    public void afterCompletion(boolean success) {
+                        if (success)
+                            try {
+                                _ode.getChannel().send(inonly);
+                            } catch (MessagingException e) {
+                                // TODO Auto-generated catch block
+                                e.printStackTrace();
+                            }
+                    }
+
+                    public void beforeCompletion() {
+                    }
+
+                });
+                odeMex.replyOneWayOk();
+            } else {
+                final InOut inout = (InOut) jbiMex;
+                NormalizedMessage nmsg = inout.createMessage();
+                mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
+                inout.setInMessage(nmsg);
+                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+                    public void afterCompletion(boolean success) {
+                        if (success)
+                            try {
+                                _outstandingExchanges.put(inout.getExchangeId(), odeMex.getMessageExchangeId());
+                                _ode.getChannel().send(inout);
+                            } catch (MessagingException e) {
+                                String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex;
+                                __log.error(errmsg, e);
+                            }
+                    }
+
+                    public void beforeCompletion() {
+                    }
+
+                });
 
-  }
+                odeMex.replyAsync();
+            }
+        } catch (MessagingException me) {
+            String errmsg = "JBI messaging error for ODE MEX " + odeMex;
+            __log.error(errmsg, me);
+            odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
+        } catch (MessageTranslationException e) {
+            String errmsg = "Error converting ODE message to JBI format for mex " + odeMex;
+            __log.error(errmsg, e);
+            odeMex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
+        }
 
-  private void outFailure(final InOut jbiMex) {
-    final String mexref = _outstandingExchanges.remove(jbiMex.getExchangeId());
-    if (mexref == null) {
-      __log.warn("Received a response for unkown JBI message exchange "
-          + jbiMex.getExchangeId());
-      return;
     }
 
-    try {
-      _ode._scheduler.execTransaction(new Callable<Boolean>() {
-        public Boolean call() throws Exception {
-          PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server
-              .getEngine().getMessageExchange(mexref);
-          
-          if (pmex == null) {
-              __log.warn("Cannot locate ODE message exchange: " + mexref + "; ignoring.");
-              return null;
-          }
-
-          pmex.replyWithFailure(FailureType.OTHER, "Error: "
-              + jbiMex.getError(), null);
-          return null;
-        }
-      });
-    } catch (Exception ex) {
-      __log.error("error delivering failure: " ,ex);
+    public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
+        if (jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)) {
+            // Ignore these, they're one way.
+        } else if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+            if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
+                outResponse((InOut) jbiMex);
+                jbiMex.setStatus(ExchangeStatus.DONE);
+                _ode.getChannel().send(jbiMex);
+            } else if (jbiMex.getStatus() == ExchangeStatus.ERROR)
+                outFailure((InOut) jbiMex);
+            else
+                __log.warn("Unexpected state for JBI message exchange: " + jbiMex.getExchangeId());
+        } else {
+            __log.fatal("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern());
+        }
+
     }
 
-  }
+    private void outFailure(final InOut jbiMex) {
+        final String mexref = _outstandingExchanges.remove(jbiMex.getExchangeId());
+        if (mexref == null) {
+            __log.warn("Received a response for unkown JBI message exchange " + jbiMex.getExchangeId());
+            return;
+        }
+
+        try {
+            _ode._scheduler.execTransaction(new Callable<Boolean>() {
+                public Boolean call() throws Exception {
+                    PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
+                            mexref);
+
+                    if (pmex == null) {
+                        __log.warn("Cannot locate ODE message exchange: " + mexref + "; ignoring.");
+                        return null;
+                    }
+
+                    pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
+                    return null;
+                }
+            });
+        } catch (Exception ex) {
+            __log.error("error delivering failure: ", ex);
+        }
 
-  private void outResponse(final InOut jbiMex) {
-    final String mexref = _outstandingExchanges.remove(jbiMex.getExchangeId());
-    if (mexref == null) {
-      __log.warn("Received a response for unkown JBI message exchange "
-          + jbiMex.getExchangeId());
-      return;
     }
 
-    try {
-      _ode._scheduler.execTransaction(new Callable<Boolean>() {
-        @SuppressWarnings("unchecked")
-        public Boolean call() throws Exception {
-          PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server
-              .getEngine().getMessageExchange(mexref);
-          
-          if (pmex == null) {
-              // I'm a bit unclear as to why this would occur, but it appears to be possible.
-              __log.warn("Cannot locate ODE message exchange: " + mexref + "; ignoring.");
-              return null;
-          }
-
-          String mapperName = pmex.getProperty(Mapper.class.getName());
-          Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
-          if (mapper == null) {
-            String errmsg = "Mapper not found.";
-            __log.error(errmsg);
-            pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
-          } else {
-            try {
-              Fault jbiFlt = jbiMex.getFault();
-              if (jbiFlt != null) {
-                javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values());
-                if (wsdlFlt == null) {
-                    pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);
-                } else {
-                    if (wsdlFlt.getMessage() != null) {
-                        Message faultResponse = pmex.createMessage(wsdlFlt.getMessage().getQName());
-                        mapper.toODE(faultResponse,jbiFlt,wsdlFlt.getMessage());
-                        pmex.replyWithFault(new QName(pmex.getPortType().getQName().getNamespaceURI(), 
-                                                      wsdlFlt.getName()), faultResponse);
+    private void outResponse(final InOut jbiMex) {
+        final String mexref = _outstandingExchanges.remove(jbiMex.getExchangeId());
+        if (mexref == null) {
+            __log.warn("Received a response for unkown JBI message exchange " + jbiMex.getExchangeId());
+            return;
+        }
+
+        try {
+            _ode._scheduler.execTransaction(new Callable<Boolean>() {
+                @SuppressWarnings("unchecked")
+                public Boolean call() throws Exception {
+                    PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
+                            mexref);
+
+                    if (pmex == null) {
+                        // I'm a bit unclear as to why this would occur, but it appears to be possible.
+                        __log.warn("Cannot locate ODE message exchange: " + mexref + "; ignoring.");
+                        return null;
+                    }
+
+                    String mapperName = pmex.getProperty(Mapper.class.getName());
+                    Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
+                    if (mapper == null) {
+                        String errmsg = "Mapper not found.";
+                        __log.error(errmsg);
+                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
                     } else {
-                        // Can this even happen?
-                        __log.fatal("Internal Error: fault found without a message type: " + wsdlFlt); 
-                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Fault has no message: " + wsdlFlt.getName(), null);
-                    }
-                }                    
-              } else {
-                Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                mapper.toODE(response,jbiMex.getOutMessage(),pmex.getOperation().getOutput().getMessage());
-                pmex.reply(response);
-              }
-            } catch (MessageTranslationException mte) {
-              __log.error("Error translating message.", mte);
-              pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
-            }
-          }
-          return null;
+                        try {
+                            Fault jbiFlt = jbiMex.getFault();
+                            if (jbiFlt != null) {
+                                javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex
+                                        .getOperation().getFaults().values());
+                                if (wsdlFlt == null) {
+                                    pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);
+                                } else {
+                                    if (wsdlFlt.getMessage() != null) {
+                                        Message faultResponse = pmex.createMessage(wsdlFlt.getMessage().getQName());
+                                        mapper.toODE(faultResponse, jbiFlt, wsdlFlt.getMessage());
+                                        pmex.replyWithFault(new QName(pmex.getPortType().getQName().getNamespaceURI(), wsdlFlt
+                                                .getName()), faultResponse);
+                                    } else {
+                                        // Can this even happen?
+                                        __log.fatal("Internal Error: fault found without a message type: " + wsdlFlt);
+                                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Fault has no message: "
+                                                + wsdlFlt.getName(), null);
+                                    }
+                                }
+                            } else {
+                                Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                                mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
+                                pmex.reply(response);
+                            }
+                        } catch (MessageTranslationException mte) {
+                            __log.error("Error translating message.", mte);
+                            pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
+                        }
+                    }
+                    return null;
+                }
+            });
+        } catch (Exception ex) {
+            __log.error("error delivering RESPONSE: ", ex);
+
         }
-      });
-    } catch (Exception ex) {
-      __log.error("error delivering RESPONSE: " ,ex);
-      
     }
-  }
 }