You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/07 09:03:07 UTC

svn commit: r683528 - in /servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools: DroolsComponent.java DroolsEndpoint.java model/JbiHelper.java

Author: gertv
Date: Thu Aug  7 00:03:07 2008
New Revision: 683528

URL: http://svn.apache.org/viewvc?rev=683528&view=rev
Log:
SM-1502: servicemix-drools should allow for asynchronous message handling

Modified:
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java Thu Aug  7 00:03:07 2008
@@ -25,6 +25,7 @@
  * @author gnodet
  * @org.apache.xbean.XBean element="component"
  */
+@SuppressWarnings("unchecked")
 public class DroolsComponent extends DefaultComponent {
 
     private DroolsEndpoint[] endpoints;

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java Thu Aug  7 00:03:07 2008
@@ -21,18 +21,24 @@
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.jbi.JBIException;
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.NamespaceContext;
 import javax.xml.namespace.QName;
 
 import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.common.ServiceUnit;
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.util.MessageUtil;
 import org.apache.servicemix.drools.model.JbiHelper;
 import org.drools.RuleBase;
 import org.drools.WorkingMemory;
@@ -54,6 +60,7 @@
     private String defaultTargetURI;
     private Map<String, Object> globals;
     private List<Object> assertedObjects;
+    private ConcurrentMap<String, JbiHelper> pending = new ConcurrentHashMap<String, JbiHelper>();
 
     public DroolsEndpoint() {
         super();
@@ -174,26 +181,69 @@
      *      javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
      */
     public void process(MessageExchange exchange) throws Exception {
-        drools(exchange);
+        if (exchange.getRole() == Role.PROVIDER) {
+            handleProviderExchange(exchange);
+        } else {
+            handleConsumerExchange(exchange);
+        }
     }
     
-    protected void drools(MessageExchange exchange) throws Exception {
-        WorkingMemory memory = createWorkingMemory(exchange);
-        populateWorkingMemory(memory, exchange);
-        memory.fireAllRules();
-        postProcess(exchange, memory);
+    /*
+     * Handle a consumer exchange
+     */
+    private void handleConsumerExchange(MessageExchange exchange) throws MessagingException {
+        String correlation = getCorrelationId(exchange); 
+        JbiHelper helper = pending.get(correlation);
+        if (helper != null) {
+            MessageExchange original = helper.getExchange().getInternalExchange();
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                done(original);
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                fail(original, exchange.getError());
+            } else {
+                if (exchange.getFault() != null) {
+                    MessageUtil.transferFaultToFault(exchange, original);
+                } else {
+                    MessageUtil.transferOutToOut(exchange, original);
+                }
+                send(original);
+            }
+            // update the rule engine's working memory to trigger post-done rules
+            helper.update();
+        } else {
+            logger.debug("No matching exchange found for " + exchange.getExchangeId() + ", no additional rules will be triggered");
+        }
     }
 
-    protected void postProcess(MessageExchange exchange, WorkingMemory memory) throws Exception {
+    private void handleProviderExchange(MessageExchange exchange) throws Exception {
         if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            String uri = getDefaultRouteURI();
-            if (uri != null) {
-                JbiHelper helper = (JbiHelper) memory.getGlobal("jbi");
-                helper.route(uri);
-            }
+            drools(exchange);
         }
-        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+    }
+
+    public static String getCorrelationId(MessageExchange exchange) {
+        Object correlation = exchange.getProperty(JbiConstants.CORRELATION_ID);
+        if (correlation == null) {
+            return exchange.getExchangeId();
+        } else {
+            return correlation.toString();
+        }
+    }
+
+    protected void drools(MessageExchange exchange) throws Exception {
+        WorkingMemory memory = createWorkingMemory(exchange);
+        JbiHelper helper = populateWorkingMemory(memory, exchange);
+        pending.put(getCorrelationId(exchange), helper);
+        memory.fireAllRules();
+        
+        //no rules were fired --> must be config problem
+        if (helper.getRulesFired() < 1) {
             fail(exchange, new Exception("No rules have handled the exchange. Check your rule base."));
+        } else {
+            //a rule was triggered but no message was forwarded -> message has been handled by drools
+            if (helper.getForwarded() < 1) {
+                pending.remove(exchange);
+            }
         }
     }
     
@@ -201,8 +251,9 @@
         return ruleBase.newStatefulSession();
     }
 
-    protected void populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
-        memory.setGlobal("jbi", new JbiHelper(this, exchange, memory));
+    protected JbiHelper populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
+        JbiHelper helper = new JbiHelper(this, exchange, memory);
+        memory.setGlobal("jbi", helper);
         if (assertedObjects != null) {
             for (Object o : assertedObjects) {
                 memory.insert(o);
@@ -213,6 +264,7 @@
                 memory.setGlobal(e.getKey(), e.getValue());
             }
         }
+        return helper;
     }
 
     public QName getDefaultTargetService() {
@@ -250,4 +302,11 @@
             return null;
         }
     }
-}
+    
+    @Override
+    protected void send(MessageExchange me) throws MessagingException {
+        //remove the exchange from the list of pending exchanges
+        pending.remove(getCorrelationId(me));
+        super.send(me);
+    }
+ }

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java Thu Aug  7 00:03:07 2008
@@ -18,7 +18,6 @@
 
 import javax.jbi.component.ComponentContext;
 import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
@@ -30,32 +29,35 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.common.EndpointSupport;
 import org.apache.servicemix.common.JbiConstants;
-import org.apache.servicemix.common.util.MessageUtil;
 import org.apache.servicemix.common.util.URIResolver;
 import org.apache.servicemix.drools.DroolsEndpoint;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.MessageUtil;
 import org.drools.FactHandle;
 import org.drools.WorkingMemory;
+import org.drools.event.ActivationCreatedEvent;
+import org.drools.event.DefaultAgendaEventListener;
 
 /**
  * A helper class for use inside a rule to forward a message to an endpoint
- *
+ * 
  * @version $Revision: 426415 $
  */
-public class JbiHelper {
+public class JbiHelper extends DefaultAgendaEventListener {
 
     private DroolsEndpoint endpoint;
     private Exchange exchange;
     private WorkingMemory memory;
     private FactHandle exchangeFactHandle;
+    private int rulesFired;
+    private int forwarded;
 
-    public JbiHelper(DroolsEndpoint endpoint, 
-                     MessageExchange exchange,
-                     WorkingMemory memory) {
+    public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory) {
         this.endpoint = endpoint;
         this.exchange = new Exchange(exchange, endpoint.getNamespaceContext());
         this.memory = memory;
+        this.memory.addEventListener(this);
         this.exchangeFactHandle = this.memory.insert(this.exchange);
     }
 
@@ -81,14 +83,17 @@
 
     /**
      * Forwards the inbound message to the given target
-     *
+     * 
      * @param uri
      */
     public void route(String uri) throws MessagingException {
         Source src = null;
         routeTo(src, uri);
     }
-    
+
+    /**
+     * @see #routeTo(Source, String)
+     */
     public void routeTo(String content, String uri) throws MessagingException {
         if (content == null) {
             routeTo(this.exchange.getInternalExchange().getMessage("in").getContent(), uri);
@@ -96,10 +101,17 @@
             routeTo(new StringSource(content), uri);
         }
     }
-    
+
+    /**
+     * Send a message to the uri
+     *  
+     * @param content the message content
+     * @param uri the target endpoint's uri
+     * @throws MessagingException
+     */
     public void routeTo(Source content, String uri) throws MessagingException {
         MessageExchange me = this.exchange.getInternalExchange();
-        String correlationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
+
         NormalizedMessage in = null;
         if (content == null) {
             in = me.getMessage("in");
@@ -113,35 +125,31 @@
         // Set the sender endpoint property
         String key = EndpointSupport.getKey(endpoint);
         newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
-        newMe.setProperty(JbiConstants.CORRELATION_ID, correlationId);
-        getChannel().sendSync(newMe);
-        if (newMe.getStatus() == ExchangeStatus.DONE) {
-            me.setStatus(ExchangeStatus.DONE);
-            getChannel().send(me);
-        } else if (newMe.getStatus() == ExchangeStatus.ERROR) {
-            me.setStatus(ExchangeStatus.ERROR);
-            me.setError(newMe.getError());
-            getChannel().send(me);
-        } else {
-            if (newMe.getFault() != null) {
-                MessageUtil.transferFaultToFault(newMe, me);
-            } else {
-                MessageUtil.transferOutToOut(newMe, me);
-            }
-            getChannel().sendSync(me);
-        }
-        update();
+        newMe.setProperty(JbiConstants.CORRELATION_ID, DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+        getChannel().send(newMe);
+        forwarded++;
     }
-    
-    
+
+    /**
+     * @see #routeToDefault(Source)
+     */
     public void routeToDefault(String content) throws MessagingException {
         routeTo(content, endpoint.getDefaultRouteURI());
     }
-    
+
+    /**
+     * Send this content to the default routing URI ({@link DroolsEndpoint#getDefaultRouteURI()} specified on the endpoint
+     * 
+     * @param content the message body
+     * @throws MessagingException
+     */
     public void routeToDefault(Source content) throws MessagingException {
         routeTo(content, endpoint.getDefaultRouteURI());
     }
 
+    /**
+     * @see #fault(Source)
+     */
     public void fault(String content) throws Exception {
         MessageExchange me = this.exchange.getInternalExchange();
         if (me instanceof InOnly) {
@@ -151,11 +159,16 @@
             Fault fault = me.createFault();
             fault.setContent(new StringSource(content));
             me.setFault(fault);
-            getChannel().sendSync(me);
+            getChannel().send(me);
         }
-        update();
     }
-    
+
+    /**
+     * Send a JBI Error message (for InOnly) or JBI Fault message (for the other MEPs)
+     * 
+     * @param content the error content
+     * @throws Exception
+     */
     public void fault(Source content) throws Exception {
         MessageExchange me = this.exchange.getInternalExchange();
         if (me instanceof InOnly) {
@@ -165,15 +178,23 @@
             Fault fault = me.createFault();
             fault.setContent(content);
             me.setFault(fault);
-            getChannel().sendSync(me);
+            getChannel().send(me);
         }
-        update();
     }
 
+    /**
+     * @see #answer(Source)
+     */
     public void answer(String content) throws Exception {
         answer(new StringSource(content));
     }
-    
+
+    /**
+     * Answer the exchange with the given response content
+     * 
+     * @param content the response
+     * @throws Exception
+     */    
     public void answer(Source content) throws Exception {
         MessageExchange me = this.exchange.getInternalExchange();
         NormalizedMessage out = me.createMessage();
@@ -183,8 +204,35 @@
         update();
     }
 
-    protected void update() {
+    /**
+     * Update the {@link MessageExchange} information in the rule engine's {@link WorkingMemory}
+     */
+    public void update() {
         this.memory.update(this.exchangeFactHandle, this.exchange);
     }
 
+    /**
+     * Get the number of rules that were fired
+     * 
+     * @return the number of rules
+     */
+    public int getRulesFired() {
+        return rulesFired;
+    }
+    
+    /**
+     * Get the number of times a message has been forwarded
+     * 
+     * @return the number of forwards
+     */
+    public int getForwarded() {
+        return forwarded;
+    }
+
+    // event handler callbacks
+    @Override
+    public void activationCreated(ActivationCreatedEvent event, WorkingMemory workingMemory) {
+        rulesFired++;
+    }
+
 }