You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/07 13:22:25 UTC
svn commit: r683584 - in
/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools:
DroolsComponent.java DroolsEndpoint.java model/JbiHelper.java
Author: gertv
Date: Thu Aug 7 04:22:24 2008
New Revision: 683584
URL: http://svn.apache.org/viewvc?rev=683584&view=rev
Log:
SM-1502: Fixing async support for servicemix-drools when the same endpoint is hit multiple times in the same flow
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java Thu Aug 7 04:22:24 2008
@@ -27,6 +27,11 @@
*/
@SuppressWarnings("unchecked")
public class DroolsComponent extends DefaultComponent {
+
+ /**
+ * Property to correlate servicemix-drools exchanges
+ */
+ public static final String DROOLS_CORRELATION_ID = "org.apache.servicemix.drools.correlation_id";
private DroolsEndpoint[] endpoints;
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java Thu Aug 7 04:22:24 2008
@@ -192,7 +192,7 @@
* Handle a consumer exchange
*/
private void handleConsumerExchange(MessageExchange exchange) throws MessagingException {
- String correlation = getCorrelationId(exchange);
+ String correlation = (String) exchange.getProperty(DroolsComponent.DROOLS_CORRELATION_ID);
JbiHelper helper = pending.get(correlation);
if (helper != null) {
MessageExchange original = helper.getExchange().getInternalExchange();
@@ -211,7 +211,7 @@
// update the rule engine's working memory to trigger post-done rules
helper.update();
} else {
- logger.debug("No matching exchange found for " + exchange.getExchangeId() + ", no additional rules will be triggered");
+ logger.debug("No pending exchange found for " + correlation + ", no additional rules will be triggered");
}
}
@@ -233,15 +233,15 @@
protected void drools(MessageExchange exchange) throws Exception {
WorkingMemory memory = createWorkingMemory(exchange);
JbiHelper helper = populateWorkingMemory(memory, exchange);
- pending.put(getCorrelationId(exchange), helper);
+ pending.put(exchange.getExchangeId(), helper);
memory.fireAllRules();
//no rules were fired --> must be config problem
if (helper.getRulesFired() < 1) {
fail(exchange, new Exception("No rules have handled the exchange. Check your rule base."));
} else {
- //a rule was triggered but no message was forwarded -> message has been handled by drools
- if (helper.getForwarded() < 1) {
+ //a rule was triggered and the message has been answered or faulted by the drools endpoint
+ if (helper.isExchangeHandled()) {
pending.remove(exchange);
}
}
@@ -306,7 +306,7 @@
@Override
protected void send(MessageExchange me) throws MessagingException {
//remove the exchange from the list of pending exchanges
- pending.remove(getCorrelationId(me));
+ pending.remove(me.getExchangeId());
super.send(me);
}
}
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683584&r1=683583&r2=683584&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java Thu Aug 7 04:22:24 2008
@@ -30,6 +30,7 @@
import org.apache.servicemix.common.EndpointSupport;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.util.URIResolver;
+import org.apache.servicemix.drools.DroolsComponent;
import org.apache.servicemix.drools.DroolsEndpoint;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -51,7 +52,7 @@
private WorkingMemory memory;
private FactHandle exchangeFactHandle;
private int rulesFired;
- private int forwarded;
+ private boolean exchangeHandled = false;
public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory) {
this.endpoint = endpoint;
@@ -126,8 +127,8 @@
String key = EndpointSupport.getKey(endpoint);
newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
newMe.setProperty(JbiConstants.CORRELATION_ID, DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+ newMe.setProperty(DroolsComponent.DROOLS_CORRELATION_ID, me.getExchangeId());
getChannel().send(newMe);
- forwarded++;
}
/**
@@ -161,6 +162,7 @@
me.setFault(fault);
getChannel().send(me);
}
+ exchangeHandled = true;
}
/**
@@ -180,6 +182,7 @@
me.setFault(fault);
getChannel().send(me);
}
+ exchangeHandled = true;
}
/**
@@ -201,6 +204,7 @@
out.setContent(content);
me.setMessage(out, "out");
getChannel().sendSync(me);
+ exchangeHandled = true;
update();
}
@@ -221,12 +225,12 @@
}
/**
- * Get the number of times a message has been forwarded
+ * Has the MessageExchange been handled by the drools endpoint?
*
- * @return the number of forwards
+ * @return
*/
- public int getForwarded() {
- return forwarded;
+ public boolean isExchangeHandled() {
+ return exchangeHandled;
}
// event handler callbacks