You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/07 09:03:07 UTC
svn commit: r683528 - in
/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools:
DroolsComponent.java DroolsEndpoint.java model/JbiHelper.java
Author: gertv
Date: Thu Aug 7 00:03:07 2008
New Revision: 683528
URL: http://svn.apache.org/viewvc?rev=683528&view=rev
Log:
SM-1502: servicemix-drools should allow for asynchronous message handling
Modified:
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsComponent.java Thu Aug 7 00:03:07 2008
@@ -25,6 +25,7 @@
* @author gnodet
* @org.apache.xbean.XBean element="component"
*/
+@SuppressWarnings("unchecked")
public class DroolsComponent extends DefaultComponent {
private DroolsEndpoint[] endpoints;
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java Thu Aug 7 00:03:07 2008
@@ -21,18 +21,24 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.NamespaceContext;
import javax.xml.namespace.QName;
import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.drools.model.JbiHelper;
import org.drools.RuleBase;
import org.drools.WorkingMemory;
@@ -54,6 +60,7 @@
private String defaultTargetURI;
private Map<String, Object> globals;
private List<Object> assertedObjects;
+ private ConcurrentMap<String, JbiHelper> pending = new ConcurrentHashMap<String, JbiHelper>();
public DroolsEndpoint() {
super();
@@ -174,26 +181,69 @@
* javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
*/
public void process(MessageExchange exchange) throws Exception {
- drools(exchange);
+ if (exchange.getRole() == Role.PROVIDER) {
+ handleProviderExchange(exchange);
+ } else {
+ handleConsumerExchange(exchange);
+ }
}
- protected void drools(MessageExchange exchange) throws Exception {
- WorkingMemory memory = createWorkingMemory(exchange);
- populateWorkingMemory(memory, exchange);
- memory.fireAllRules();
- postProcess(exchange, memory);
+ /*
+ * Handle a consumer exchange
+ */
+ private void handleConsumerExchange(MessageExchange exchange) throws MessagingException {
+ String correlation = getCorrelationId(exchange);
+ JbiHelper helper = pending.get(correlation);
+ if (helper != null) {
+ MessageExchange original = helper.getExchange().getInternalExchange();
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(original);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(original, exchange.getError());
+ } else {
+ if (exchange.getFault() != null) {
+ MessageUtil.transferFaultToFault(exchange, original);
+ } else {
+ MessageUtil.transferOutToOut(exchange, original);
+ }
+ send(original);
+ }
+ // update the rule engine's working memory to trigger post-done rules
+ helper.update();
+ } else {
+ logger.debug("No matching exchange found for " + exchange.getExchangeId() + ", no additional rules will be triggered");
+ }
}
- protected void postProcess(MessageExchange exchange, WorkingMemory memory) throws Exception {
+ private void handleProviderExchange(MessageExchange exchange) throws Exception {
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- String uri = getDefaultRouteURI();
- if (uri != null) {
- JbiHelper helper = (JbiHelper) memory.getGlobal("jbi");
- helper.route(uri);
- }
+ drools(exchange);
}
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ }
+
+ public static String getCorrelationId(MessageExchange exchange) {
+ Object correlation = exchange.getProperty(JbiConstants.CORRELATION_ID);
+ if (correlation == null) {
+ return exchange.getExchangeId();
+ } else {
+ return correlation.toString();
+ }
+ }
+
+ protected void drools(MessageExchange exchange) throws Exception {
+ WorkingMemory memory = createWorkingMemory(exchange);
+ JbiHelper helper = populateWorkingMemory(memory, exchange);
+ pending.put(getCorrelationId(exchange), helper);
+ memory.fireAllRules();
+
+ //no rules were fired --> must be config problem
+ if (helper.getRulesFired() < 1) {
fail(exchange, new Exception("No rules have handled the exchange. Check your rule base."));
+ } else {
+ //a rule was triggered but no message was forwarded -> message has been handled by drools
+ if (helper.getForwarded() < 1) {
+ pending.remove(exchange);
+ }
}
}
@@ -201,8 +251,9 @@
return ruleBase.newStatefulSession();
}
- protected void populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
- memory.setGlobal("jbi", new JbiHelper(this, exchange, memory));
+ protected JbiHelper populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
+ JbiHelper helper = new JbiHelper(this, exchange, memory);
+ memory.setGlobal("jbi", helper);
if (assertedObjects != null) {
for (Object o : assertedObjects) {
memory.insert(o);
@@ -213,6 +264,7 @@
memory.setGlobal(e.getKey(), e.getValue());
}
}
+ return helper;
}
public QName getDefaultTargetService() {
@@ -250,4 +302,11 @@
return null;
}
}
-}
+
+ @Override
+ protected void send(MessageExchange me) throws MessagingException {
+ //remove the exchange from the list of pending exchanges
+ pending.remove(getCorrelationId(me));
+ super.send(me);
+ }
+ }
Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=683528&r1=683527&r2=683528&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java (original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java Thu Aug 7 00:03:07 2008
@@ -18,7 +18,6 @@
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
@@ -30,32 +29,35 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.common.EndpointSupport;
import org.apache.servicemix.common.JbiConstants;
-import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.common.util.URIResolver;
import org.apache.servicemix.drools.DroolsEndpoint;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.MessageUtil;
import org.drools.FactHandle;
import org.drools.WorkingMemory;
+import org.drools.event.ActivationCreatedEvent;
+import org.drools.event.DefaultAgendaEventListener;
/**
* A helper class for use inside a rule to forward a message to an endpoint
- *
+ *
* @version $Revision: 426415 $
*/
-public class JbiHelper {
+public class JbiHelper extends DefaultAgendaEventListener {
private DroolsEndpoint endpoint;
private Exchange exchange;
private WorkingMemory memory;
private FactHandle exchangeFactHandle;
+ private int rulesFired;
+ private int forwarded;
- public JbiHelper(DroolsEndpoint endpoint,
- MessageExchange exchange,
- WorkingMemory memory) {
+ public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory) {
this.endpoint = endpoint;
this.exchange = new Exchange(exchange, endpoint.getNamespaceContext());
this.memory = memory;
+ this.memory.addEventListener(this);
this.exchangeFactHandle = this.memory.insert(this.exchange);
}
@@ -81,14 +83,17 @@
/**
* Forwards the inbound message to the given target
- *
+ *
* @param uri
*/
public void route(String uri) throws MessagingException {
Source src = null;
routeTo(src, uri);
}
-
+
+ /**
+ * @see #routeTo(Source, String)
+ */
public void routeTo(String content, String uri) throws MessagingException {
if (content == null) {
routeTo(this.exchange.getInternalExchange().getMessage("in").getContent(), uri);
@@ -96,10 +101,17 @@
routeTo(new StringSource(content), uri);
}
}
-
+
+ /**
+ * Send a message to the uri
+ *
+ * @param content the message content
+ * @param uri the target endpoint's uri
+ * @throws MessagingException
+ */
public void routeTo(Source content, String uri) throws MessagingException {
MessageExchange me = this.exchange.getInternalExchange();
- String correlationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
+
NormalizedMessage in = null;
if (content == null) {
in = me.getMessage("in");
@@ -113,35 +125,31 @@
// Set the sender endpoint property
String key = EndpointSupport.getKey(endpoint);
newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
- newMe.setProperty(JbiConstants.CORRELATION_ID, correlationId);
- getChannel().sendSync(newMe);
- if (newMe.getStatus() == ExchangeStatus.DONE) {
- me.setStatus(ExchangeStatus.DONE);
- getChannel().send(me);
- } else if (newMe.getStatus() == ExchangeStatus.ERROR) {
- me.setStatus(ExchangeStatus.ERROR);
- me.setError(newMe.getError());
- getChannel().send(me);
- } else {
- if (newMe.getFault() != null) {
- MessageUtil.transferFaultToFault(newMe, me);
- } else {
- MessageUtil.transferOutToOut(newMe, me);
- }
- getChannel().sendSync(me);
- }
- update();
+ newMe.setProperty(JbiConstants.CORRELATION_ID, DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+ getChannel().send(newMe);
+ forwarded++;
}
-
-
+
+ /**
+ * @see #routeToDefault(Source)
+ */
public void routeToDefault(String content) throws MessagingException {
routeTo(content, endpoint.getDefaultRouteURI());
}
-
+
+ /**
+ * Send this content to the default routing URI ({@link DroolsEndpoint#getDefaultRouteURI()} specified on the endpoint
+ *
+ * @param content the message body
+ * @throws MessagingException
+ */
public void routeToDefault(Source content) throws MessagingException {
routeTo(content, endpoint.getDefaultRouteURI());
}
+ /**
+ * @see #fault(Source)
+ */
public void fault(String content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
if (me instanceof InOnly) {
@@ -151,11 +159,16 @@
Fault fault = me.createFault();
fault.setContent(new StringSource(content));
me.setFault(fault);
- getChannel().sendSync(me);
+ getChannel().send(me);
}
- update();
}
-
+
+ /**
+ * Send a JBI Error message (for InOnly) or JBI Fault message (for the other MEPs)
+ *
+ * @param content the error content
+ * @throws Exception
+ */
public void fault(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
if (me instanceof InOnly) {
@@ -165,15 +178,23 @@
Fault fault = me.createFault();
fault.setContent(content);
me.setFault(fault);
- getChannel().sendSync(me);
+ getChannel().send(me);
}
- update();
}
+ /**
+ * @see #answer(Source)
+ */
public void answer(String content) throws Exception {
answer(new StringSource(content));
}
-
+
+ /**
+ * Answer the exchange with the given response content
+ *
+ * @param content the response
+ * @throws Exception
+ */
public void answer(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
NormalizedMessage out = me.createMessage();
@@ -183,8 +204,35 @@
update();
}
- protected void update() {
+ /**
+ * Update the {@link MessageExchange} information in the rule engine's {@link WorkingMemory}
+ */
+ public void update() {
this.memory.update(this.exchangeFactHandle, this.exchange);
}
+ /**
+ * Get the number of rules that were fired
+ *
+ * @return the number of rules
+ */
+ public int getRulesFired() {
+ return rulesFired;
+ }
+
+ /**
+ * Get the number of times a message has been forwarded
+ *
+ * @return the number of forwards
+ */
+ public int getForwarded() {
+ return forwarded;
+ }
+
+ // event handler callbacks
+ @Override
+ public void activationCreated(ActivationCreatedEvent event, WorkingMemory workingMemory) {
+ rulesFired++;
+ }
+
}