You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/07 13:22:25 UTC

svn commit: r683584 - in /servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools: DroolsComponent.java DroolsEndpoint.java model/JbiHelper.java

Author: gertv
Date: Thu Aug  7 04:22:24 2008
New Revision: 683584

URL: http://svn.apache.org/viewvc?rev=683584&view=rev
Log:
SM-1502: Fixing async support for servicemix-drools when the same endpoint is hit multiple times in the same flow

Modified:
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java Thu Aug  7 04:22:24 2008
@@ -27,6 +27,11 @@
  */
 @SuppressWarnings("unchecked")
 public class DroolsComponent extends DefaultComponent {
+    
+    /**
+     * Property to correlate servicemix-drools exchanges
+     */
+    public static final String DROOLS_CORRELATION_ID = "org.apache.servicemix.drools.correlation_id";
 
     private DroolsEndpoint[] endpoints;
     

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java Thu Aug  7 04:22:24 2008
@@ -192,7 +192,7 @@
      * Handle a consumer exchange
      */
     private void handleConsumerExchange(MessageExchange exchange) throws MessagingException {
-        String correlation = getCorrelationId(exchange); 
+        String correlation = (String) exchange.getProperty(DroolsComponent.DROOLS_CORRELATION_ID); 
         JbiHelper helper = pending.get(correlation);
         if (helper != null) {
             MessageExchange original = helper.getExchange().getInternalExchange();
@@ -211,7 +211,7 @@
             // update the rule engine's working memory to trigger post-done rules
             helper.update();
         } else {
-            logger.debug("No matching exchange found for " + exchange.getExchangeId() + ", no additional rules will be triggered");
+            logger.debug("No pending exchange found for " + correlation + ", no additional rules will be triggered");
         }
     }
 
@@ -233,15 +233,15 @@
     protected void drools(MessageExchange exchange) throws Exception {
         WorkingMemory memory = createWorkingMemory(exchange);
         JbiHelper helper = populateWorkingMemory(memory, exchange);
-        pending.put(getCorrelationId(exchange), helper);
+        pending.put(exchange.getExchangeId(), helper);
         memory.fireAllRules();
         
         //no rules were fired --> must be config problem
         if (helper.getRulesFired() < 1) {
             fail(exchange, new Exception("No rules have handled the exchange. Check your rule base."));
         } else {
-            //a rule was triggered but no message was forwarded -> message has been handled by drools
-            if (helper.getForwarded() < 1) {
+            //a rule was triggered and the message has been answered or faulted by the drools endpoint
+            if (helper.isExchangeHandled()) {
                 pending.remove(exchange);
             }
         }
@@ -306,7 +306,7 @@
     @Override
     protected void send(MessageExchange me) throws MessagingException {
         //remove the exchange from the list of pending exchanges
-        pending.remove(getCorrelationId(me));
+        pending.remove(me.getExchangeId());
         super.send(me);
     }
  }

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java Thu Aug  7 04:22:24 2008
@@ -30,6 +30,7 @@
 import org.apache.servicemix.common.EndpointSupport;
 import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.common.util.URIResolver;
+import org.apache.servicemix.drools.DroolsComponent;
 import org.apache.servicemix.drools.DroolsEndpoint;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -51,7 +52,7 @@
     private WorkingMemory memory;
     private FactHandle exchangeFactHandle;
     private int rulesFired;
-    private int forwarded;
+    private boolean exchangeHandled = false;
 
     public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory) {
         this.endpoint = endpoint;
@@ -126,8 +127,8 @@
         String key = EndpointSupport.getKey(endpoint);
         newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
         newMe.setProperty(JbiConstants.CORRELATION_ID, DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+        newMe.setProperty(DroolsComponent.DROOLS_CORRELATION_ID, me.getExchangeId());
         getChannel().send(newMe);
-        forwarded++;
     }
 
     /**
@@ -161,6 +162,7 @@
             me.setFault(fault);
             getChannel().send(me);
         }
+        exchangeHandled = true;
     }
 
     /**
@@ -180,6 +182,7 @@
             me.setFault(fault);
             getChannel().send(me);
         }
+        exchangeHandled = true;
     }
 
     /**
@@ -201,6 +204,7 @@
         out.setContent(content);
         me.setMessage(out, "out");
         getChannel().sendSync(me);
+        exchangeHandled = true;
         update();
     }
 
@@ -221,12 +225,12 @@
     }
     
     /**
-     * Get the number of times a message has been forwarded
+     * Has the MessageExchange been handled by the drools endpoint?
      * 
-     * @return the number of forwards
+     * @return
      */
-    public int getForwarded() {
-        return forwarded;
+    public boolean isExchangeHandled() {
+        return exchangeHandled;
     }
 
     // event handler callbacks