You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by cc...@apache.org on 2009/01/11 20:19:58 UTC
svn commit: r733498 - in /servicemix/smx3/branches/servicemix-3.2:
common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/
core/servicemix-core/src/main/java/org/apache/servicemix/components/util/
core/servicemix-core/src/te...
Author: ccustine
Date: Sun Jan 11 11:19:58 2009
New Revision: 733498
URL: http://svn.apache.org/viewvc?rev=733498&view=rev
Log:
SM-1757 TransformComponentSupport does not handle errors nor does it support robust-in-only MEPs
Modified:
servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java
Modified: servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/common/servicemix-components/src/test/java/org/apache/servicemix/components/groovy/ServiceMixClientTest.java Sun Jan 11 11:19:58 2009
@@ -22,8 +22,12 @@
import java.util.Map;
import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
import javax.xml.transform.stream.StreamSource;
@@ -32,10 +36,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.jbi.container.SpringJBIContainer;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.resolver.EndpointResolver;
+import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.tck.Receiver;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.support.AbstractXmlApplicationContext;
@@ -47,6 +54,7 @@
private static transient Log log = LogFactory.getLog(ServiceMixClientTest.class);
protected AbstractXmlApplicationContext context;
+ protected SpringJBIContainer container;
protected ServiceMixClient client;
protected Receiver receiver;
@@ -70,6 +78,44 @@
receiver.getMessageList().assertMessagesReceived(1);
}
+ public void testSendWithErrorUsingJbiAPIs() throws Exception {
+
+ MessageExchange exchange = client.createInOnlyExchange();
+
+ NormalizedMessage message = exchange.getMessage("in");
+ message.setProperty("name", "James");
+ message.setContent(new StreamSource(new StringReader("<hello>world</hello>")));
+
+ activateComponent(new ReturnErrorComponent(), "error");
+
+ QName service = new QName("error");
+ exchange.setService(service);
+ client.send(exchange);
+
+ exchange = client.receive();
+ assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
+ }
+
+ public void testSendWithFaultUsingJbiAPIs() throws Exception {
+
+ MessageExchange exchange = client.createRobustInOnlyExchange();
+
+ NormalizedMessage message = exchange.getMessage("in");
+ message.setProperty("name", "James");
+ message.setContent(new StreamSource(new StringReader("<hello>world</hello>")));
+
+ activateComponent(new ReturnFaultComponent(), "fault");
+
+ QName service = new QName("fault");
+ exchange.setService(service);
+ client.send(exchange);
+
+ exchange = client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, exchange.getStatus());
+ assertNotNull(exchange.getFault());
+ client.done(exchange);
+ }
+
public void testSendUsingMapAndPOJOsByServiceName() throws Exception {
Map properties = new HashMap();
@@ -188,8 +234,8 @@
// TODO
//receiver = (Receiver) getBean("receiver");
- SpringJBIContainer jbi = (SpringJBIContainer) getBean("jbi");
- receiver = (Receiver) jbi.getBean("receiver");
+ container = (SpringJBIContainer) getBean("jbi");
+ receiver = (Receiver) container.getBean("receiver");
assertNotNull("receiver not found in JBI container", receiver);
}
@@ -211,4 +257,31 @@
return new ClassPathXmlApplicationContext("org/apache/servicemix/components/groovy/example.xml");
}
+
+ protected void activateComponent(ComponentSupport comp, String name) throws Exception {
+ comp.setService(new QName(name));
+ comp.setEndpoint("endpoint");
+ container.activateComponent(comp, name);
+ }
+
+ public static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, new Exception());
+ }
+ }
+ }
+
+ public static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ Fault fault = exchange.createFault();
+ fault.setContent(new StringSource("<fault/>"));
+ fail(exchange, fault);
+ }
+ }
+ }
+
}
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/components/util/TransformComponentSupport.java Sun Jan 11 11:19:58 2009
@@ -16,8 +16,12 @@
*/
package org.apache.servicemix.components.util;
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
@@ -25,6 +29,10 @@
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.jbi.util.MessageUtil;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
/**
* A useful base class for a transform component.
@@ -33,8 +41,12 @@
*/
public abstract class TransformComponentSupport extends ComponentSupport implements MessageExchangeListener {
+ private String correlation;
+
private boolean copyProperties = true;
private boolean copyAttachments = true;
+ private StoreFactory storeFactory;
+ private Store store;
protected TransformComponentSupport() {
}
@@ -43,23 +55,30 @@
super(service, endpoint);
}
- public void onMessageExchange(MessageExchange exchange) {
- // Skip done exchanges
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- // Handle error exchanges
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ // Handle consumer exchanges && non-active RobustInOnly provider exchanges
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER
+ || exchange.getProperty(correlation) != null) {
+ processOngoingExchange(exchange);
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ processFirstExchange(exchange);
}
+ }
+
+ protected void processFirstExchange(MessageExchange exchange) {
try {
- InOnly outExchange = null;
+ MessageExchange outExchange = null;
NormalizedMessage in = getInMessage(exchange);
NormalizedMessage out;
if (isInAndOut(exchange)) {
out = exchange.createMessage();
} else {
- outExchange = getExchangeFactory().createInOnlyExchange();
+ URI pattern = exchange.getPattern();
+ outExchange = getExchangeFactory().createExchange(pattern);
outExchange.setProperty(JbiConstants.SENDER_ENDPOINT, getService() + ":" + getEndpoint());
+ // Set correlations
+ outExchange.setProperty(correlation, exchange.getExchangeId());
+ exchange.setProperty(correlation, outExchange.getExchangeId());
String processCorrelationId = (String)exchange.getProperty(JbiConstants.CORRELATION_ID);
if (processCorrelationId != null) {
outExchange.setProperty(JbiConstants.CORRELATION_ID, processCorrelationId);
@@ -72,23 +91,41 @@
if (isInAndOut(exchange)) {
exchange.setMessage(out, "out");
if (txSync) {
- getDeliveryChannel().sendSync(exchange);
+ sendSync(exchange);
} else {
- getDeliveryChannel().send(exchange);
+ send(exchange);
}
} else {
outExchange.setMessage(out, "in");
if (txSync) {
- getDeliveryChannel().sendSync(outExchange);
+ sendSync(outExchange);
+ if (outExchange.getStatus() == ExchangeStatus.DONE) {
+ done(exchange);
+ } else if (outExchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(exchange, outExchange.getError());
+ } else if (outExchange.getFault() != null) {
+ Fault fault = MessageUtil.copyFault(outExchange);
+ done(outExchange);
+ MessageUtil.transferToFault(fault, exchange);
+ sendSync(exchange);
+ } else {
+ done(outExchange);
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ + " but has no Out nor Fault message");
+ }
} else {
- getDeliveryChannel().send(outExchange);
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ send(outExchange);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
}
- exchange.setStatus(ExchangeStatus.DONE);
- getDeliveryChannel().send(exchange);
}
} else {
exchange.setStatus(ExchangeStatus.DONE);
- getDeliveryChannel().send(exchange);
+ send(exchange);
}
} catch (Exception e) {
try {
@@ -102,15 +139,79 @@
}
}
+ protected void processOngoingExchange(MessageExchange exchange) {
+ MessageExchange original = null;
+ String id = null;
+ try {
+ id = (String) exchange.getProperty(correlation);
+ original = (MessageExchange) store.load(id);
+ } catch (Exception e) {
+ // We can't do, so just return
+ return;
+ }
+ try {
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ done(original);
+ // Reproduce ERROR status to the other side
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ fail(original, exchange.getError());
+ // Reproduce faults to the other side and listeners
+ } else if (exchange.getFault() != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ MessageUtil.transferTo(exchange, original, "fault");
+ send(original);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
+ // Reproduce answers to the other side
+ } else if (exchange.getMessage("out") != null) {
+ store.store(exchange.getExchangeId(), exchange);
+ try {
+ MessageUtil.transferTo(exchange, original, "out");
+ send(original);
+ } catch (Exception e) {
+ store.load(exchange.getExchangeId());
+ throw e;
+ }
+ } else {
+ throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ + " but has no Out nor Fault message");
+ }
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Original error: " + e, e);
+ }
+ }
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
+
/**
* Transforms the given out message
*/
protected abstract boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception;
+ @Override
+ protected void init() throws JBIException {
+ super.init();
+ if (store == null) {
+ if (storeFactory == null) {
+ storeFactory = new MemoryStoreFactory();
+ }
+ try {
+ store = storeFactory.open(getService().toString() + getEndpoint());
+ } catch (IOException e) {
+ throw new JBIException("Unable to open storeFactory" + e.getMessage(), e);
+ }
+ }
+ correlation = "TransformComponentSupport.Correlation." + getService() + "." + getEndpoint();
+
+ }
public boolean isCopyProperties() {
return copyProperties;
@@ -150,4 +251,21 @@
CopyTransformer.copyAttachments(in, out);
}
}
+
+ public StoreFactory getStoreFactory() {
+ return storeFactory;
+ }
+
+ public void setStoreFactory(StoreFactory storeFactory) {
+ this.storeFactory = storeFactory;
+ }
+
+ public Store getStore() {
+ return store;
+ }
+
+ public void setStore(Store store) {
+ this.store = store;
+ }
+
}
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java?rev=733498&r1=733497&r2=733498&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/RegistryTest.java Sun Jan 11 11:19:58 2009
@@ -36,6 +36,7 @@
container.start();
EchoComponent component = new EchoComponent();
+ component.setService(new QName("http://foo.bar.com", "myService"));
container.activateComponent(component, "component");
ServiceEndpoint ep = component.getContext().activateEndpoint(new QName("http://foo.bar.com", "myService"), "myEndpoint");
DocumentFragment epr = ep.getAsReference(null);
@@ -50,6 +51,7 @@
container.start();
EchoComponent component = new EchoComponent();
+ component.setService(new QName("http://foo.bar.com", "myService"));
container.activateComponent(component, "component");
ServiceEndpoint ep = component.getContext().activateEndpoint(new QName("http://foo.bar.com", "myService"), "myEndpoint");
DocumentFragment epr = URIResolver.createWSAEPR("endpoint:http://foo.bar.com/myService/myEndpoint");