You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/11/12 10:32:08 UTC
svn commit: r713330 - in
/servicemix/components/bindings/servicemix-jms/trunk: ./
src/main/java/org/apache/servicemix/jms/endpoints/
src/test/java/org/apache/servicemix/jms/
Author: gnodet
Date: Wed Nov 12 01:32:04 2008
New Revision: 713330
URL: http://svn.apache.org/viewvc?rev=713330&view=rev
Log:
SM-1680, SM-1681, SM-1682: handle attachments, faults and errors on the new JMS endpoints
Added:
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified:
servicemix/components/bindings/servicemix-jms/trunk/pom.xml
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/pom.xml?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/pom.xml (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/pom.xml Wed Nov 12 01:32:04 2008
@@ -155,12 +155,24 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-javamail_1.3.1_spec</artifactId>
- <version>1.2</version>
- <scope>provided</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-javamail_1.4_spec</artifactId>
+ <version>1.5</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.javamail</groupId>
+ <artifactId>geronimo-javamail_1.4_mail</artifactId>
+ <version>1.5</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.javamail</groupId>
+ <artifactId>geronimo-javamail_1.4_provider</artifactId>
+ <version>1.5</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
@@ -181,7 +193,7 @@
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+ <artifactId>geronimo-jta_1.1_spec</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
@@ -254,18 +266,6 @@
</dependency>
<!-- for unit/integration testing -->
<dependency>
- <groupId>org.apache.servicemix</groupId>
- <artifactId>servicemix-core</artifactId>
- <version>${servicemix-version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jmx</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring-version}</version>
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java Wed Nov 12 01:32:04 2008
@@ -426,8 +426,8 @@
}
protected void processExchange(final MessageExchange exchange, final Session session, final JmsContext context) throws Exception {
- // Ignore DONE exchanges
- if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // Ignore InOnly exchanges which are currently handled in fire-and-forget mode
+ if (exchange instanceof InOnly) {
return;
}
// Create session if needed
@@ -475,6 +475,10 @@
dest = getReplyDestination(exchange, error, session, context);
setCorrelationId(context.getMessage(), msg);
send(msg, session, dest);
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ msg = session.createMessage();
+ msg.setBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY, true);
+ send(msg, session, dest);
} else {
throw new IllegalStateException("Unrecognized exchange status");
}
@@ -541,9 +545,7 @@
} catch (Exception e) {
handleException(exchange, e, session, context);
}
- if (exchange.getStatus() != ExchangeStatus.DONE) {
- processExchange(exchange, session, context);
- }
+ processExchange(exchange, session, context);
} else {
if (stateless) {
exchange.setProperty(PROP_JMS_CONTEXT, context);
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/AbstractJmsMarshaler.java Wed Nov 12 01:32:04 2008
@@ -33,6 +33,12 @@
*/
public abstract class AbstractJmsMarshaler {
+ public static final String DONE_JMS_PROPERTY = "JBIDone";
+
+ public static final String FAULT_JMS_PROPERTY = "JBIFault";
+
+ public static final String ERROR_JMS_PROPERTY = "JBIError";
+
/**
* Should marshaler copy properties set in messages?
*/
@@ -111,7 +117,7 @@
* @param value the property value
* @return true if it should be copied
*/
- private boolean shouldIncludeHeader(String name, Object value) {
+ protected boolean shouldIncludeHeader(String name, Object value) {
return (value instanceof String || value instanceof Number || value instanceof Date)
&& (!isNeedJavaIdentifiers() || isJavaIdentifier(name));
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultConsumerMarshaler.java Wed Nov 12 01:32:04 2008
@@ -20,7 +20,13 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
import java.net.URI;
+import java.util.Map;
+import java.util.Set;
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.Fault;
@@ -29,11 +35,22 @@
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamReader;
+import javax.activation.DataHandler;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
public class DefaultConsumerMarshaler extends AbstractJmsMarshaler implements JmsConsumerMarshaler {
@@ -78,28 +95,73 @@
}
public Message createOut(MessageExchange exchange, NormalizedMessage outMsg, Session session, JmsContext context) throws Exception {
- String text = new SourceTransformer().contentToString(outMsg);
- TextMessage textMessage = session.createTextMessage(text);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PhaseInterceptorChain chain = new PhaseInterceptorChain();
+ chain.add(new AttachmentsOutInterceptor());
+ chain.add(new StaxOutInterceptor());
+ chain.add(new BodyOutInterceptor());
+ org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+ msg.setContent(Source.class, outMsg.getContent());
+ msg.setContent(OutputStream.class, baos);
+ for (String attId : (Set<String>) outMsg.getAttachmentNames()) {
+ msg. getAttachments().put(attId, outMsg.getAttachment(attId));
+ }
+ chain.doIntercept(msg);
+ TextMessage text = session.createTextMessage(baos.toString());
+ text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+ (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
if (isCopyProperties()) {
- copyPropertiesFromNM(outMsg, textMessage);
+ copyPropertiesFromNM(outMsg, text);
}
- return textMessage;
+ return text;
}
public Message createFault(MessageExchange exchange, Fault fault, Session session, JmsContext context) throws Exception {
- String text = new SourceTransformer().contentToString(fault);
- return session.createTextMessage(text);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PhaseInterceptorChain chain = new PhaseInterceptorChain();
+ chain.add(new AttachmentsOutInterceptor());
+ chain.add(new StaxOutInterceptor());
+ chain.add(new BodyOutInterceptor());
+ org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+ msg.setContent(Source.class, fault.getContent());
+ msg.setContent(OutputStream.class, baos);
+ for (String attId : (Set<String>) fault.getAttachmentNames()) {
+ msg. getAttachments().put(attId, fault.getAttachment(attId));
+ }
+ chain.doIntercept(msg);
+ TextMessage text = session.createTextMessage(baos.toString());
+ text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+ (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
+ text.setBooleanProperty(FAULT_JMS_PROPERTY, true);
+ if (isCopyProperties()) {
+ copyPropertiesFromNM(fault, text);
+ }
+ return text;
}
public Message createError(MessageExchange exchange, Exception error, Session session, JmsContext context) throws Exception {
- throw error;
+ ObjectMessage message = session.createObjectMessage(error);
+ message.setBooleanProperty(ERROR_JMS_PROPERTY, true);
+ return message;
}
protected void populateMessage(Message message, NormalizedMessage normalizedMessage) throws Exception {
if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- Source source = new StringSource(textMessage.getText());
- normalizedMessage.setContent(source);
+ PhaseInterceptorChain chain = new PhaseInterceptorChain();
+ chain.add(new AttachmentsInInterceptor());
+ chain.add(new StaxInInterceptor());
+ org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+ msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+ String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+ if (contentType != null) {
+ msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+ }
+ chain.doIntercept(msg);
+ XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+ normalizedMessage.setContent(new StaxSource(xmlReader));
+ for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+ normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+ }
} else {
throw new UnsupportedOperationException("JMS message is not a TextMessage");
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/DefaultProviderMarshaler.java Wed Nov 12 01:32:04 2008
@@ -17,6 +17,12 @@
package org.apache.servicemix.jms.endpoints;
import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
@@ -24,9 +30,19 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamReader;
+import javax.activation.DataHandler;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.soap.core.PhaseInterceptorChain;
+import org.apache.servicemix.soap.core.MessageImpl;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsInInterceptor;
+import org.apache.servicemix.soap.interceptors.mime.AttachmentsOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxInInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.StaxOutInterceptor;
+import org.apache.servicemix.soap.interceptors.xml.BodyOutInterceptor;
+import org.apache.servicemix.soap.util.stax.StaxSource;
public class DefaultProviderMarshaler extends AbstractJmsMarshaler implements
JmsProviderMarshaler {
@@ -49,8 +65,21 @@
}
public Message createMessage(MessageExchange exchange, NormalizedMessage in, Session session) throws Exception {
- TextMessage text = session.createTextMessage();
- text.setText(transformer.contentToString(in));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PhaseInterceptorChain chain = new PhaseInterceptorChain();
+ chain.add(new AttachmentsOutInterceptor());
+ chain.add(new StaxOutInterceptor());
+ chain.add(new BodyOutInterceptor());
+ org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+ msg.setContent(Source.class, in.getContent());
+ msg.setContent(OutputStream.class, baos);
+ for (String attId : (Set<String>) in.getAttachmentNames()) {
+ msg. getAttachments().put(attId, in.getAttachment(attId));
+ }
+ chain.doIntercept(msg);
+ TextMessage text = session.createTextMessage(baos.toString());
+ text.setStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE,
+ (String) msg.get(org.apache.servicemix.soap.api.Message.CONTENT_TYPE));
if (jmsProperties != null) {
for (Map.Entry<String, Object> e : jmsProperties.entrySet()) {
text.setObjectProperty(e.getKey(), e.getValue());
@@ -66,12 +95,23 @@
public void populateMessage(Message message, MessageExchange exchange, NormalizedMessage normalizedMessage) throws Exception {
if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- Source source = new StringSource(textMessage.getText());
- normalizedMessage.setContent(source);
-
+ PhaseInterceptorChain chain = new PhaseInterceptorChain();
+ chain.add(new AttachmentsInInterceptor());
+ chain.add(new StaxInInterceptor());
+ org.apache.servicemix.soap.api.Message msg = new MessageImpl();
+ msg.setContent(InputStream.class, new ByteArrayInputStream(((TextMessage) message).getText().getBytes()));
+ String contentType = message.getStringProperty(org.apache.servicemix.soap.api.Message.CONTENT_TYPE);
+ if (contentType != null) {
+ msg.put(org.apache.servicemix.soap.api.Message.CONTENT_TYPE, contentType);
+ }
+ chain.doIntercept(msg);
+ XMLStreamReader xmlReader = msg.getContent(XMLStreamReader.class);
+ normalizedMessage.setContent(new StaxSource(xmlReader));
+ for (Map.Entry<String, DataHandler> attachment : msg.getAttachments().entrySet()) {
+ normalizedMessage.addAttachment(attachment.getKey(), attachment.getValue());
+ }
if (isCopyProperties()) {
- copyPropertiesFromJMS(textMessage, normalizedMessage);
+ copyPropertiesFromJMS(message, normalizedMessage);
}
} else {
throw new UnsupportedOperationException("JMS message is not a TextMessage");
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Wed Nov 12 01:32:04 2008
@@ -23,6 +23,7 @@
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.RobustInOnly;
import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.Fault;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -30,6 +31,7 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
import org.apache.servicemix.common.endpoints.ProviderEndpoint;
import org.apache.servicemix.common.JbiConstants;
@@ -239,7 +241,7 @@
* Specifies if the QoS values specified for the endpoint are explicitly
* used when a messages is sent. The default is <code>false</code>.
*
- * @param replyExplicitQosEnabled should the QoS values be sent?
+ * @param explicitQosEnabled should the QoS values be sent?
*/
public void setExplicitQosEnabled(boolean explicitQosEnabled) {
this.explicitQosEnabled = explicitQosEnabled;
@@ -476,7 +478,7 @@
done(exchange);
// In message
} else if ((in = exchange.getMessage("in")) != null) {
- if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+ if (exchange instanceof InOnly) {
processInOnly(exchange, in);
done(exchange);
}
@@ -631,14 +633,30 @@
if (receiveJmsMsg == null) {
throw new IllegalStateException("Unable to receive response");
}
-
- NormalizedMessage out = exchange.getMessage("out");
- if (out == null) {
- out = exchange.createMessage();
- exchange.setMessage(out, "out");
+ if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+ Exception e = (Exception) ((ObjectMessage) receiveJmsMsg).getObject();
+ exchange.setError(e);
+ exchange.setStatus(ExchangeStatus.ERROR);
+ } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+ Fault fault = exchange.getFault();
+ if (fault == null) {
+ fault = exchange.createFault();
+ exchange.setFault(fault);
+ }
+ marshaler.populateMessage(receiveJmsMsg, exchange, fault);
+ } else {
+ NormalizedMessage out = exchange.getMessage("out");
+ if (out == null) {
+ out = exchange.createMessage();
+ exchange.setMessage(out, "out");
+ }
+ marshaler.populateMessage(receiveJmsMsg, exchange, out);
}
- marshaler.populateMessage(receiveJmsMsg, exchange, out);
- boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+ && exchange.isTransacted()
+ && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
if (txSync) {
sendSync(exchange);
} else {
@@ -665,12 +683,27 @@
logger.error("Unable to load exchange related to incoming JMS message " + message, e);
}
try {
- NormalizedMessage out = exchange.getMessage("out");
- if (out == null) {
- out = exchange.createMessage();
- exchange.setMessage(out, "out");
+ if (message.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ } else if (message.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
+ Exception e = (Exception) ((ObjectMessage) message).getObject();
+ exchange.setError(e);
+ exchange.setStatus(ExchangeStatus.ERROR);
+ } else if (message.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
+ Fault fault = exchange.getFault();
+ if (fault == null) {
+ fault = exchange.createFault();
+ exchange.setFault(fault);
+ }
+ marshaler.populateMessage(message, exchange, fault);
+ } else {
+ NormalizedMessage out = exchange.getMessage("out");
+ if (out == null) {
+ out = exchange.createMessage();
+ exchange.setMessage(out, "out");
+ }
+ marshaler.populateMessage(message, exchange, out);
}
- marshaler.populateMessage(message, exchange, out);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while populating JBI exchange " + exchange, e);
@@ -678,7 +711,9 @@
exchange.setError(e);
}
try {
- boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
+ && exchange.isTransacted()
+ && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
if (txSync) {
sendSync(exchange);
} else {
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/AbstractJmsTestSupport.java Wed Nov 12 01:32:04 2008
@@ -16,6 +16,10 @@
*/
package org.apache.servicemix.jms;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -26,9 +30,12 @@
import org.apache.activemq.jndi.ActiveMQInitialContextFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
import org.jencks.GeronimoPlatformTransactionManager;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jms.core.JmsTemplate;
@@ -106,4 +113,29 @@
protected void configureJmsBroker() throws Exception {
}
+
+ protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
+ private Exception exception;
+
+ public ReturnErrorComponent(Exception exception) {
+ this.exception = exception;
+ }
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ fail(exchange, exception);
+ }
+ }
+ }
+
+ protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ Fault fault = exchange.createFault();
+ fault.setContent(new StringSource("<fault/>"));
+ fail(exchange, fault);
+ }
+ }
+ }
+
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java?rev=713330&r1=713329&r2=713330&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JMSComponentTest.java Wed Nov 12 01:32:04 2008
@@ -20,28 +20,24 @@
import java.net.URI;
import java.net.URL;
+import javax.activation.DataHandler;
import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.mail.util.ByteArrayDataSource;
import javax.xml.namespace.QName;
import javax.xml.transform.Source;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.components.jms.JmsReceiverComponent;
import org.apache.servicemix.components.jms.JmsServiceComponent;
-import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.components.util.EchoComponent;
-import org.apache.servicemix.jbi.FaultException;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
@@ -199,27 +195,32 @@
InOut inout = null;
boolean result = false;
+ DataHandler dh = null;
// Test successful return
inout = client.createInOutExchange();
inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+ inout.getInMessage().addAttachment("myImage", dh);
result = client.sendSync(inout);
assertTrue(result);
NormalizedMessage out = inout.getOutMessage();
assertNotNull(out);
Source src = out.getContent();
assertNotNull(src);
+ dh = out.getAttachment("myImage");
+ assertNotNull(dh);
+
logger.info(new SourceTransformer().toString(src));
- // TODO
-// // Test fault return
+ // Test fault return
// container.deactivateComponent("receiver");
// ReturnFaultComponent fault = new ReturnFaultComponent();
// ActivationSpec asFault = new ActivationSpec("receiver", fault);
// asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
// container.activateComponent(asFault);
-//
+//
// inout = client.createInOutExchange();
// inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
// inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -233,7 +234,7 @@
ActivationSpec asError = new ActivationSpec("receiver", error);
asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
container.activateComponent(asError);
-
+
inout = client.createInOutExchange();
inout.setInterfaceName(new QName("http://jms.servicemix.org/Test", "ProviderInterface"));
inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
@@ -242,29 +243,5 @@
assertTrue("An IllegalArgumentException was expected", inout.getError() instanceof IllegalArgumentException);
}
-
- protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener {
- private Exception exception;
-
- public ReturnErrorComponent(Exception exception) {
- this.exception = exception;
- }
-
- public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- fail(exchange, exception);
- }
- }
- }
-
- protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener {
- public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- Fault fault = exchange.createFault();
- fault.setContent(new StringSource("<fault/>"));
- fail(exchange, fault);
- }
- }
- }
}
Added: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=713330&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java (added)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java Wed Nov 12 01:32:04 2008
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.activation.DataHandler;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.ConnectionFactory;
+import javax.mail.util.ByteArrayDataSource;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jms.endpoints.DefaultConsumerMarshaler;
+import org.apache.servicemix.jms.endpoints.DefaultProviderMarshaler;
+import org.apache.servicemix.jms.endpoints.JmsConsumerEndpoint;
+import org.apache.servicemix.jms.endpoints.JmsProviderEndpoint;
+
+public class JmsProviderConsumerEndpointTest extends AbstractJmsTestSupport {
+
+ private static Log logger = LogFactory.getLog(JmsProviderConsumerEndpointTest.class);
+
+ public void testProviderConsumerInOut() throws Exception {
+ ConnectionFactory connFactory = new PooledConnectionFactory(connectionFactory);
+ JmsComponent jmsComponent = new JmsComponent();
+ JmsConsumerEndpoint consumerEndpoint = createConsumerEndpoint(connFactory);
+ JmsProviderEndpoint providerEndpoint = createProviderEndpoint(connFactory);
+ jmsComponent.setEndpoints(new JmsEndpointType[] {consumerEndpoint, providerEndpoint});
+ container.activateComponent(jmsComponent, "servicemix-jms");
+
+ // Add an echo component
+ EchoComponent echo = new EchoComponent();
+ ActivationSpec asEcho = new ActivationSpec("receiver", echo);
+ asEcho.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ container.activateComponent(asEcho);
+
+ InOut inout = null;
+ boolean result = false;
+ DataHandler dh = null;
+
+ // Test successful return
+ inout = client.createInOutExchange();
+ inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ dh = new DataHandler(new ByteArrayDataSource("myImage", "application/octet-stream"));
+ inout.getInMessage().addAttachment("myImage", dh);
+ result = client.sendSync(inout);
+ assertTrue(result);
+ NormalizedMessage out = inout.getOutMessage();
+ assertNotNull(out);
+ Source src = out.getContent();
+ assertNotNull(src);
+ dh = out.getAttachment("myImage");
+ assertNotNull(dh);
+
+ logger.info(new SourceTransformer().toString(src));
+
+ // Test fault return
+ container.deactivateComponent("receiver");
+ ReturnFaultComponent fault = new ReturnFaultComponent();
+ ActivationSpec asFault = new ActivationSpec("receiver", fault);
+ asFault.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ container.activateComponent(asFault);
+
+ inout = client.createInOutExchange();
+ inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ result = client.sendSync(inout);
+ assertTrue(result);
+ assertNotNull(inout.getFault());
+
+ // Test error return
+ container.deactivateComponent("receiver");
+ ReturnErrorComponent error = new ReturnErrorComponent(new IllegalArgumentException());
+ ActivationSpec asError = new ActivationSpec("receiver", error);
+ asError.setService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ container.activateComponent(asError);
+
+ inout = client.createInOutExchange();
+ inout.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ inout.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.sendSync(inout);
+ assertEquals(ExchangeStatus.ERROR, inout.getStatus());
+ assertTrue("An IllegalArgumentException was expected", inout.getError() instanceof IllegalArgumentException);
+
+ }
+
+ private JmsConsumerEndpoint createConsumerEndpoint(ConnectionFactory connFactory) throws URISyntaxException {
+ JmsConsumerEndpoint endpoint = new JmsConsumerEndpoint();
+ endpoint.setService(new QName("http://jms.servicemix.org/Test", "Consumer"));
+ endpoint.setEndpoint("endpoint");
+ DefaultConsumerMarshaler marshaler = new DefaultConsumerMarshaler();
+ marshaler.setMep(new URI("http://www.w3.org/2004/08/wsdl/in-out"));
+ endpoint.setMarshaler(marshaler);
+ endpoint.setListenerType("simple");
+ endpoint.setConnectionFactory(connFactory);
+ endpoint.setDestinationName("destination");
+ endpoint.setTargetService(new QName("http://jms.servicemix.org/Test", "Echo"));
+ return endpoint;
+ }
+
+ private JmsProviderEndpoint createProviderEndpoint(ConnectionFactory connFactory) {
+ JmsProviderEndpoint endpoint = new JmsProviderEndpoint();
+ endpoint.setService(new QName("http://jms.servicemix.org/Test", "Provider"));
+ endpoint.setEndpoint("endpoint");
+ DefaultProviderMarshaler marshaler = new DefaultProviderMarshaler();
+ endpoint.setMarshaler(marshaler);
+ endpoint.setConnectionFactory(connFactory);
+ endpoint.setDestinationName("destination");
+ return endpoint;
+ }
+}