You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/10/23 11:02:56 UTC
svn commit: r707319 - in
/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines:
servicemix-camel/src/main/java/org/apache/servicemix/camel/
servicemix-drools/src/main/java/org/apache/servicemix/drools/
servicemix-drools/src/main/java/org/...
Author: gertv
Date: Thu Oct 23 02:02:55 2008
New Revision: 707319
URL: http://svn.apache.org/viewvc?rev=707319&view=rev
Log:
SM-1640: Backporting SM-1502 to ServiceMix 3.2.x branch
Added:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/resources/chained.drl
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiBinding.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/java/org/apache/servicemix/drools/DroolsComponentTest.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiBinding.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiBinding.java?rev=707319&r1=707318&r2=707319&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiBinding.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiBinding.java Thu Oct 23 02:02:55 2008
@@ -36,6 +36,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.util.ExchangeHelper;
/**
@@ -55,7 +56,11 @@
}
public Source convertBodyToJbi(Exchange exchange, Object body) {
- return ExchangeHelper.convertToType(exchange, Source.class, body);
+ try {
+ return ExchangeHelper.convertToType(exchange, Source.class, body);
+ } catch (NoTypeConversionAvailableException e) {
+ return null;
+ }
}
public MessageExchange makeJbiMessageExchange(Exchange camelExchange,
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsComponent.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsComponent.java?rev=707319&r1=707318&r2=707319&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsComponent.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsComponent.java Thu Oct 23 02:02:55 2008
@@ -1,54 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.drools;
-
-import java.util.List;
-
-import org.apache.servicemix.common.DefaultComponent;
-
-/**
- *
- * @author gnodet
- * @org.apache.xbean.XBean element="component"
- */
-public class DroolsComponent extends DefaultComponent {
-
- private DroolsEndpoint[] endpoints;
-
- protected List getConfiguredEndpoints() {
- return asList(endpoints);
- }
-
- protected Class[] getEndpointClasses() {
- return new Class[] {DroolsEndpoint.class };
- }
-
- /**
- * @return the endpoints
- */
- public DroolsEndpoint[] getEndpoints() {
- return endpoints;
- }
-
- /**
- * @param endpoints the endpoints to set
- */
- public void setEndpoints(DroolsEndpoint[] endpoints) {
- this.endpoints = endpoints;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.drools;
+
+import java.util.List;
+
+import org.apache.servicemix.common.DefaultComponent;
+
+/**
+ *
+ * @author gnodet
+ * @org.apache.xbean.XBean element="component"
+ */
+@SuppressWarnings("unchecked")
+public class DroolsComponent extends DefaultComponent {
+ /**
+ * Property to correlate servicemix-drools exchanges
+ */
+ public static final String DROOLS_CORRELATION_ID = "org.apache.servicemix.drools.correlation_id";
+
+ private DroolsEndpoint[] endpoints;
+
+ protected List getConfiguredEndpoints() {
+ return asList(endpoints);
+ }
+
+ protected Class[] getEndpointClasses() {
+ return new Class[] {DroolsEndpoint.class };
+ }
+
+ /**
+ * @return the endpoints
+ */
+ public DroolsEndpoint[] getEndpoints() {
+ return endpoints;
+ }
+
+ /**
+ * @param endpoints the endpoints to set
+ */
+ public void setEndpoints(DroolsEndpoint[] endpoints) {
+ this.endpoints = endpoints;
+ }
+
+}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=707319&r1=707318&r2=707319&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java Thu Oct 23 02:02:55 2008
@@ -21,19 +21,27 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
+
+
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.NamespaceContext;
import javax.xml.namespace.QName;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
import org.apache.servicemix.drools.model.JbiHelper;
+import org.apache.servicemix.jbi.util.MessageUtil;
import org.drools.RuleBase;
import org.drools.WorkingMemory;
import org.drools.compiler.RuleBaseLoader;
@@ -54,6 +62,7 @@
private String defaultTargetURI;
private Map<String, Object> globals;
private List<Object> assertedObjects;
+ private ConcurrentMap<String, JbiHelper> pending = new ConcurrentHashMap<String, JbiHelper>();
public DroolsEndpoint() {
super();
@@ -174,26 +183,76 @@
* javax.jbi.messaging.MessageExchange, javax.jbi.messaging.NormalizedMessage)
*/
public void process(MessageExchange exchange) throws Exception {
- drools(exchange);
+// drools(exchange);
+ if (exchange.getRole() == Role.PROVIDER) {
+ handleProviderExchange(exchange);
+ } else {
+ handleConsumerExchange(exchange);
+ }
+
+ }
+ /*
+ * Handle a consumer exchange
+ */
+ private void handleConsumerExchange(MessageExchange exchange)
+ throws MessagingException {
+ String correlation = (String) exchange.getProperty(DroolsComponent.DROOLS_CORRELATION_ID);
+ JbiHelper helper = pending.get(correlation);
+ if (helper != null) {
+ MessageExchange original = helper.getExchange()
+ .getInternalExchange();
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(original);
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(original, exchange.getError());
+ } else {
+ if (exchange.getFault() != null) {
+ MessageUtil.transferFaultToFault(exchange, original);
+ } else {
+ MessageUtil.transferOutToOut(exchange, original);
+ }
+ send(original);
+ }
+ // update the rule engine's working memory to trigger post-done
+ // rules
+ helper.update();
+ } else {
+ logger.debug("No pending exchange found for "
+ + correlation
+ + ", no additional rules will be triggered");
+ }
+ }
+
+ //protected void postProcess(MessageExchange exchange, WorkingMemory memory) throws Exception {
+ private void handleProviderExchange(MessageExchange exchange) throws Exception {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ drools(exchange);
+ }
}
+ public static String getCorrelationId(MessageExchange exchange) {
+ Object correlation = exchange.getProperty(JbiConstants.CORRELATION_ID);
+ if (correlation == null) {
+ return exchange.getExchangeId();
+ } else {
+ return correlation.toString();
+ }
+ }
+
protected void drools(MessageExchange exchange) throws Exception {
WorkingMemory memory = createWorkingMemory(exchange);
- populateWorkingMemory(memory, exchange);
+ JbiHelper helper = populateWorkingMemory(memory, exchange);
+ pending.put(exchange.getExchangeId(), helper);
memory.fireAllRules();
- postProcess(exchange, memory);
- }
- protected void postProcess(MessageExchange exchange, WorkingMemory memory) throws Exception {
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- String uri = getDefaultRouteURI();
- if (uri != null) {
- JbiHelper helper = (JbiHelper) memory.getGlobal("jbi");
- helper.route(uri);
- }
- }
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (helper.getRulesFired() < 1) {
fail(exchange, new Exception("No rules have handled the exchange. Check your rule base."));
+ } else {
+ //a rule was triggered and the message has been answered or faulted by the drools endpoint
+ if (helper.isExchangeHandled()) {
+ pending.remove(exchange);
+ }
+
}
}
@@ -201,8 +260,9 @@
return ruleBase.newWorkingMemory();
}
- protected void populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
- memory.setGlobal("jbi", new JbiHelper(this, exchange, memory));
+ protected JbiHelper populateWorkingMemory(WorkingMemory memory, MessageExchange exchange) throws Exception {
+ JbiHelper helper = new JbiHelper(this, exchange, memory);
+ memory.setGlobal("jbi", helper);
if (assertedObjects != null) {
for (Object o : assertedObjects) {
memory.assertObject(o);
@@ -213,6 +273,7 @@
memory.setGlobal(e.getKey(), e.getValue());
}
}
+ return helper;
}
public QName getDefaultTargetService() {
@@ -245,9 +306,18 @@
} else if (defaultTargetService != null) {
String nsURI = defaultTargetService.getNamespaceURI();
String sep = (nsURI.indexOf("/") > 0) ? "/" : ":";
- return "service:" + nsURI + sep + defaultTargetService.getLocalPart();
+ return "service:" + nsURI + sep
+ + defaultTargetService.getLocalPart();
} else {
return null;
}
}
+
+ @Override
+ protected void send(MessageExchange me) throws MessagingException {
+ // must be a DONE/ERROR so removing any pending contexts
+ pending.remove(me.getExchangeId());
+ super.send(me);
+ }
+
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=707319&r1=707318&r2=707319&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java Thu Oct 23 02:02:55 2008
@@ -25,6 +25,7 @@
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
+import javax.xml.transform.Source;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,30 +33,40 @@
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.client.ServiceMixClientFacade;
import org.apache.servicemix.common.EndpointSupport;
+import org.apache.servicemix.drools.DroolsComponent;
import org.apache.servicemix.drools.DroolsEndpoint;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.resolver.URIResolver;
import org.apache.servicemix.jbi.util.MessageUtil;
import org.drools.FactHandle;
import org.drools.WorkingMemory;
+import org.drools.event.ActivationCreatedEvent;
+import org.drools.event.DefaultAgendaEventListener;
/**
* A helper class for use inside a rule to forward a message to an endpoint
*
* @version $Revision: 426415 $
*/
-public class JbiHelper {
+public class JbiHelper extends DefaultAgendaEventListener {
private DroolsEndpoint endpoint;
private Exchange exchange;
private WorkingMemory memory;
private FactHandle exchangeFactHandle;
+ private int rulesFired;
+ private boolean exchangeHandled;
public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory) {
this.endpoint = endpoint;
this.exchange = new Exchange(exchange, endpoint.getNamespaceContext());
this.memory = memory;
+
+ this.memory.addEventListener(this);
this.exchangeFactHandle = this.memory.assertObject(this.exchange);
+
+
}
public DroolsEndpoint getEndpoint() {
@@ -105,7 +116,6 @@
public void routeTo(String content, String uri) throws MessagingException {
MessageExchange me = this.exchange.getInternalExchange();
- String correlationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
NormalizedMessage in = null;
if (content == null) {
in = me.getMessage("in");
@@ -119,24 +129,9 @@
// Set the sender endpoint property
String key = EndpointSupport.getKey(endpoint);
newMe.setProperty(JbiConstants.SENDER_ENDPOINT, key);
- newMe.setProperty(JbiConstants.CORRELATION_ID, correlationId);
- getChannel().sendSync(newMe);
- if (newMe.getStatus() == ExchangeStatus.DONE) {
- me.setStatus(ExchangeStatus.DONE);
- getChannel().send(me);
- } else if (newMe.getStatus() == ExchangeStatus.ERROR) {
- me.setStatus(ExchangeStatus.ERROR);
- me.setError(newMe.getError());
- getChannel().send(me);
- } else {
- if (newMe.getFault() != null) {
- MessageUtil.transferFaultToFault(newMe, me);
- } else {
- MessageUtil.transferOutToOut(newMe, me);
- }
- getChannel().sendSync(me);
- }
- update();
+ newMe.setProperty(JbiConstants.CORRELATION_ID, DroolsEndpoint.getCorrelationId(this.exchange.getInternalExchange()));
+ newMe.setProperty(DroolsComponent.DROOLS_CORRELATION_ID, me.getExchangeId());
+ getChannel().send(newMe);
}
public void routeToDefault(String content) throws MessagingException {
@@ -182,30 +177,76 @@
}
public void fault(String content) throws Exception {
+ fault(new StringSource(content));
+ }
+ /**
+ * Send a JBI Error message (for InOnly) or JBI Fault message (for the other MEPs)
+ *
+ * @param content the error content
+ * @throws Exception
+ */
+ public void fault(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
if (me instanceof InOnly) {
- me.setError(new Exception(content));
+ me.setError(new Exception(new SourceTransformer().toString(content)));
getChannel().send(me);
} else {
Fault fault = me.createFault();
- fault.setContent(new StringSource(content));
+ fault.setContent(content);
me.setFault(fault);
- getChannel().sendSync(me);
+ getChannel().send(me);
}
- update();
+ exchangeHandled = true;
}
public void answer(String content) throws Exception {
+ answer(new StringSource(content));
+ }
+
+ /**
+ * Answer the exchange with the given response content
+ *
+ * @param content the response
+ * @throws Exception
+ */
+ public void answer(Source content) throws Exception {
MessageExchange me = this.exchange.getInternalExchange();
NormalizedMessage out = me.createMessage();
- out.setContent(new StringSource(content));
+ out.setContent(content);
me.setMessage(out, "out");
getChannel().sendSync(me);
+ exchangeHandled = true;
update();
}
- protected void update() {
+ public void update() {
this.memory.modifyObject(this.exchangeFactHandle, this.exchange);
}
+
+ /**
+ * Get the number of rules that were fired
+ *
+ * @return the number of rules
+ */
+ public int getRulesFired() {
+ return rulesFired;
+ }
+
+ /**
+ * Has the MessageExchange been handled by the drools endpoint?
+ *
+ * @return
+ */
+
+ public boolean isExchangeHandled() {
+ return exchangeHandled;
+ }
+
+
+ // event handler callbacks
+ @Override
+ public void activationCreated(ActivationCreatedEvent event) {
+ rulesFired++;
+ }
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/java/org/apache/servicemix/drools/DroolsComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/java/org/apache/servicemix/drools/DroolsComponentTest.java?rev=707319&r1=707318&r2=707319&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/java/org/apache/servicemix/drools/DroolsComponentTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/java/org/apache/servicemix/drools/DroolsComponentTest.java Thu Oct 23 02:02:55 2008
@@ -23,10 +23,10 @@
import javax.jbi.messaging.InOut;
import javax.xml.namespace.QName;
-import org.w3c.dom.Element;
+import org.w3c.dom.Element;
import junit.framework.TestCase;
-
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.components.util.MockServiceComponent;
@@ -54,6 +54,38 @@
jbi.shutDown();
}
+ public void testChainedRoutingInOnly() throws Exception {
+ drools = new DroolsComponent();
+
+ DroolsEndpoint endpoint = new DroolsEndpoint(drools.getServiceUnit(),
+ new QName("smx", "drools"), "endpoint");
+ endpoint.setRuleBaseResource(new ClassPathResource("chained.drl"));
+
+ drools.setEndpoints(new DroolsEndpoint[] {endpoint});
+ jbi.activateComponent(drools, "servicemix-drools");
+
+ ReceiverComponent target = new ReceiverComponent();
+ target.setService(new QName("smx", "target"));
+ target.setEndpoint("endpoint");
+
+ jbi.activateComponent(target, "target");
+
+ jbi.start();
+
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("smx", "drools"));
+ me.setOperation(new QName("smx", "process"));
+ me.getInMessage().setContent(new StringSource("<payload />"));
+ me.setProperty(JbiConstants.CORRELATION_ID, "TEST");
+ if (client.sendSync(me, 10000)) {
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ } else {
+ fail("No response from drools in time...");
+ }
+
+ Thread.sleep(50);
+ }
+
public void testRouteInOnly() throws Exception {
drools = new DroolsComponent();
DroolsEndpoint endpoint = new DroolsEndpoint(drools.getServiceUnit(),
Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/resources/chained.drl
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/resources/chained.drl?rev=707319&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/resources/chained.drl (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-drools/src/test/resources/chained.drl Thu Oct 23 02:02:55 2008
@@ -0,0 +1,30 @@
+package org.apache.servicemix.drools
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.servicemix.drools.model.Exchange;
+
+global org.apache.servicemix.drools.model.JbiHelper jbi;
+
+rule UnknownOperation
+ agenda-group "init"
+ auto-focus true
+ when
+ $me : Exchange( status == Exchange.ACTIVE, $in : in != null,
+ operation != "{smx}process")
+ then
+ jbi.route("service:smx/target");
+end
+
+rule Input
+ agenda-group "input"
+ auto-focus true
+ when
+ $me : Exchange( status == Exchange.ACTIVE, $in : in != null,
+ operation == "{smx}process" )
+ eval ( true )
+ then
+ jbi.getLogger().debug("[Rule: Input]: Message routed to drools2...");
+ jbi.route("service:smx/drools");
+end