You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/14 10:01:33 UTC
svn commit: r713955 - in
/servicemix/components/engines/servicemix-bean/trunk: pom.xml
src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java
src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
Author: gnodet
Date: Fri Nov 14 01:01:32 2008
New Revision: 713955
URL: http://svn.apache.org/viewvc?rev=713955&view=rev
Log:
SM-1668, SM-1604: Fix TransformBeanSupport processing of faults and RobustInOnly meps
Modified:
servicemix/components/engines/servicemix-bean/trunk/pom.xml
servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java
servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
Modified: servicemix/components/engines/servicemix-bean/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/pom.xml?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/pom.xml (original)
+++ servicemix/components/engines/servicemix-bean/trunk/pom.xml Fri Nov 14 01:01:32 2008
@@ -36,7 +36,7 @@
<properties>
<previous.releases>3.1.2,3.2,3.2.1</previous.releases>
- <servicemix-version>3.2.1</servicemix-version>
+ <servicemix-version>3.3</servicemix-version>
<servicemix-shared-version>2008.02-SNAPSHOT</servicemix-shared-version>
<servicemix.osgi.import>
org.apache.commons.jexl*;resolution:=optional,
Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/support/TransformBeanSupport.java Fri Nov 14 01:01:32 2008
@@ -16,18 +16,21 @@
*/
package org.apache.servicemix.bean.support;
+import java.net.URI;
+
+import javax.annotation.PostConstruct;
import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-import javax.annotation.PostConstruct;
import org.apache.servicemix.common.JbiConstants;
+import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.jbi.listener.MessageExchangeListener;
import org.apache.servicemix.jbi.transformer.CopyTransformer;
-import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
/**
@@ -37,6 +40,8 @@
*/
public abstract class TransformBeanSupport extends BeanSupport implements MessageExchangeListener {
+ private String correlation;
+
private ExchangeTarget target;
private boolean copyProperties = true;
@@ -110,25 +115,60 @@
}
store = storeFactory.open(getService().toString() + getEndpoint());
}
+ correlation = "TransformBeanSupport.Correlation." + getService() + "." + getEndpoint();
}
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- // Handle consumer exchanges
- if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ // Handle consumer exchanges && non-active RobustInOnly provider exchanges
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER
+ || exchange.getProperty(correlation) != null) {
MessageExchange original = null;
+ String id = null;
try {
- original = (MessageExchange) store.load(exchange.getExchangeId());
+ id = (String) exchange.getProperty(correlation);
+ original = (MessageExchange) store.load(id);
} catch (Exception e) {
// We can't do, so just return
return;
}
- if (exchange.getStatus() == ExchangeStatus.ERROR) {
- original.setStatus(ExchangeStatus.ERROR);
- original.setError(exchange.getError());
- send(original);
+ try {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(original);
+ // Reproduce ERROR status to the other side
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(original, exchange.getError());
+ // Reproduce faults to the other side and listeners
+ } else if (exchange.getFault() != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ MessageUtil.transferTo(exchange, original, "fault");
+ send(original);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
+ // Reproduce answers to the other side
+ } else if (exchange.getMessage("out") != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ MessageUtil.transferTo(exchange, original, "out");
+ send(original);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ + " but has no Out nor Fault message");
+ }
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Original error: " + e, e);
+ }
}
return;
}
+
// Skip done exchanges
if (exchange.getStatus() == ExchangeStatus.DONE) {
return;
@@ -137,18 +177,22 @@
return;
}
try {
- InOnly outExchange = null;
+ MessageExchange outExchange = null;
NormalizedMessage in = getInMessage(exchange);
NormalizedMessage out;
if (isInAndOut(exchange)) {
out = exchange.createMessage();
} else {
+ URI pattern = exchange.getPattern();
if (target == null) {
- throw new IllegalStateException("An IN-ONLY TransformBean has no Target specified");
+ throw new IllegalStateException("A TransformBean with MEP " + pattern + " has no Target specified");
}
- outExchange = getExchangeFactory().createInOnlyExchange();
+ outExchange = getExchangeFactory().createExchange(pattern);
target.configureTarget(outExchange, getContext());
outExchange.setProperty(JbiConstants.SENDER_ENDPOINT, getService() + ":" + getEndpoint());
+ // Set correlations
+ outExchange.setProperty(correlation, exchange.getExchangeId());
+ exchange.setProperty(correlation, outExchange.getExchangeId());
String processCorrelationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
if (processCorrelationId != null) {
outExchange.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
@@ -169,17 +213,28 @@
outExchange.setMessage(out, "in");
if (txSync) {
sendSync(outExchange);
- if (outExchange.getStatus() == ExchangeStatus.ERROR) {
- exchange.setStatus(ExchangeStatus.ERROR);
- exchange.setError(outExchange.getError());
- send(exchange);
+ if (outExchange.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (outExchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, outExchange.getError());
+ } else if (outExchange.getFault() != null) {
+ Fault fault = MessageUtil.copyFault(outExchange);
+ done(outExchange);
+ MessageUtil.transferToFault(fault, exchange);
+ sendSync(exchange);
} else {
- exchange.setStatus(ExchangeStatus.DONE);
- send(exchange);
+ done(outExchange);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ + " but has no Out nor Fault message");
}
} else {
- store.store(outExchange.getExchangeId(), exchange);
- send(outExchange);
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ send(outExchange);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
}
}
} else {
Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java?rev=713955&r1=713954&r2=713955&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java Fri Nov 14 01:01:32 2008
@@ -16,72 +16,157 @@
*/
package org.apache.servicemix.bean;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.component.Component;
+import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.support.ExchangeTarget;
import org.apache.servicemix.bean.support.TransformBeanSupport;
-import org.apache.servicemix.bean.pojos.LoggingPojo;
+import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.common.util.MessageUtil;
+import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.jbi.container.JBIContainer;
-import org.apache.servicemix.jbi.listener.MessageExchangeListener;
import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.components.util.EchoComponent;
-import org.apache.servicemix.components.util.TransformComponentSupport;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.bean.support.ExchangeTarget;
-import org.apache.servicemix.client.ServiceMixClient;
-import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.servicemix.tck.ReceiverComponent;
public class TransformBeanSupportTest extends TestCase {
+ protected DefaultServiceMixClient client;
protected JBIContainer container;
+ protected ExchangeCompletedListener listener;
protected BeanComponent component;
protected void setUp() throws Exception {
container = new JBIContainer();
container.setEmbedded(true);
+ container.setUseMBeanServer(false);
+ container.setCreateMBeanServer(false);
+ configureContainer();
+ listener = new ExchangeCompletedListener();
+ container.addListener(listener);
+
container.init();
+ container.start();
component = new BeanComponent();
container.activateComponent(component, "servicemix-bean");
- container.start();
+ client = new DefaultServiceMixClient(container);
}
protected void tearDown() throws Exception {
container.shutDown();
+ listener.assertExchangeCompleted();
}
- public void testInOnlyWithError() throws Exception {
- MyTransformer transformer = new MyTransformer();
- ExchangeTarget target = new ExchangeTarget();
- target.setService(new QName("error"));
- transformer.setTarget(target);
- BeanEndpoint transformEndpoint = new BeanEndpoint();
- transformEndpoint.setBean(transformer);
- transformEndpoint.setService(new QName("transform"));
- transformEndpoint.setEndpoint("endpoint");
+ protected void configureContainer() throws Exception {
+ container.setFlowName("st");
+ }
+
+ ReceiverComponent receiver = new ReceiverComponent();
+
+ public void testInOnly() throws Exception {
+ TransformBeanSupport transformer = createTransformer("receiver");
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
component.addEndpoint(transformEndpoint);
- SendErrorComponent sendErrorComponent = new SendErrorComponent();
- container.activateComponent(sendErrorComponent, "error");
+ ReceiverComponent receiver = new ReceiverComponent();
+ activateComponent(receiver, "receiver");
- ServiceMixClient client = new DefaultServiceMixClient(container);
MessageExchange io = client.createInOnlyExchange();
io.setService(new QName("transform"));
io.getMessage("in").setContent(new StringSource("<hello/>"));
client.send(io);
+
io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ }
+
+ public void testInOnlyWithError() throws Exception {
+ TransformBeanSupport transformer = createTransformer("error");
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+ component.addEndpoint(transformEndpoint);
+
+ activateComponent(new ReturnErrorComponent(), "error");
+
+ MessageExchange io = client.createInOnlyExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
assertEquals(ExchangeStatus.ERROR, io.getStatus());
}
+ public void testRobustInOnly() throws Exception {
+ TransformBeanSupport transformer = createTransformer("receiver");
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+ component.addEndpoint(transformEndpoint);
+
+ ReceiverComponent receiver = new ReceiverComponent();
+ activateComponent(receiver, "receiver");
+
+ MessageExchange io = client.createRobustInOnlyExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.DONE, io.getStatus());
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ }
+
+ public void testRobustInOnlyWithFaultAndError() throws Exception {
+ TransformBeanSupport transformer = createTransformer("fault");
+ BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+ component.addEndpoint(transformEndpoint);
+
+ activateComponent(new ReturnFaultComponent(), "fault");
+
+ MessageExchange io = client.createRobustInOnlyExchange();
+ io.setService(new QName("transform"));
+ io.getMessage("in").setContent(new StringSource("<hello/>"));
+ client.send(io);
+
+ io = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+ assertNotNull(io.getFault());
+ client.fail(io, new Exception("I do not like faults"));
+ }
+
+ private MyTransformer createTransformer(String targetService) {
+ MyTransformer transformer = new MyTransformer();
+ ExchangeTarget target = new ExchangeTarget();
+ target.setService(new QName(targetService));
+ transformer.setTarget(target);
+ return transformer;
+ }
+
+ private BeanEndpoint createBeanEndpoint(TransformBeanSupport transformer) {
+ BeanEndpoint transformEndpoint = new BeanEndpoint();
+ transformEndpoint.setBean(transformer);
+ transformEndpoint.setService(new QName("transform"));
+ transformEndpoint.setEndpoint("endpoint");
+ return transformEndpoint;
+ }
+
+ protected void activateComponent(ComponentSupport component, String name) throws Exception {
+ component.setService(new QName(name));
+ component.setEndpoint("endpoint");
+ container.activateComponent(component, name);
+ }
+
public static class MyTransformer extends TransformBeanSupport {
protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception {
MessageUtil.transfer(in, out);
@@ -89,15 +174,23 @@
}
}
- public static class SendErrorComponent extends ComponentSupport implements MessageExchangeListener {
- public SendErrorComponent() {
- setService(new QName("error"));
+ public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, new Exception());
+ }
}
+ }
+
+ public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- exchange.setStatus(ExchangeStatus.ERROR);
- exchange.setError(new Exception());
- send(exchange);
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ Fault fault = exchange.createFault();
+ fault.setContent(new StringSource("<fault/>"));
+ fail(exchange, fault);
+ }
}
}