You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/05/27 11:38:16 UTC
svn commit: r779072 - in
/servicemix/components/engines/servicemix-camel/trunk/src:
main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/
Author: gertv
Date: Wed May 27 09:38:16 2009
New Revision: 779072
URL: http://svn.apache.org/viewvc?rev=779072&view=rev
Log:
SMXCOMP-495: Allow Camel Exchange to outlive the underlying JBI MessageExchange
Modified:
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java
servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java Wed May 27 09:38:16 2009
@@ -20,6 +20,7 @@
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.RobustInOnly;
import javax.xml.namespace.QName;
@@ -103,9 +104,9 @@
} else {
e = new Exception(t);
}
- fail(exchange, e);
+ fail(camelExchange, e);
} else {
- done(exchange);
+ done(camelExchange);
}
} else {
if (logger.isDebugEnabled()) {
@@ -124,16 +125,16 @@
} else {
e = new Exception(t);
}
- fail(exchange, e);
+ fail(camelExchange, e);
} else {
boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
// if (camelExchange.getOut(false) == null) {
// camelExchange.getOut().copyFrom(camelExchange.getIn());
// }
if (txSync) {
- sendSync(exchange);
+ sendSync(camelExchange);
} else {
- send(exchange);
+ send(camelExchange);
}
}
}
@@ -143,4 +144,31 @@
}
}
+ /*
+ * Send the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+ */
+ private void send(JbiExchange camelExchange) throws MessagingException {
+ send(camelExchange.detach());
+ }
+
+ /*
+ * Synchronously send the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+ */
+ private void sendSync(JbiExchange camelExchange) throws MessagingException {
+ sendSync(camelExchange.detach());
+ }
+
+ /*
+ * Send a DONE status for the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+ */
+ private void done(JbiExchange camelExchange) throws MessagingException {
+ done(camelExchange.detach());
+ }
+
+ /*
+ * Make the underlying JBI MessageExchange fail and detach it from the Camel JbiExchange
+ */
+ private void fail(JbiExchange camelExchange, Exception e) throws MessagingException {
+ fail(camelExchange.detach(), e);
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java Wed May 27 09:38:16 2009
@@ -45,13 +45,10 @@
populateProperties();
}
-
-
public JbiExchange(CamelContext context, JbiBinding binding, MessageExchange messageExchange) {
super(context);
this.binding = binding;
this.messageExchange = messageExchange;
-
setPattern(ExchangePattern.fromWsdlUri(messageExchange.getPattern().toString()));
populateProperties();
}
@@ -82,8 +79,12 @@
}
@Override
- public org.apache.camel.Exchange newInstance() {
- return new JbiExchange(this.getContext(), this.getBinding(), this.getMessageExchange());
+ public org.apache.camel.Exchange newInstance() {
+ if (messageExchange == null) {
+ return new JbiExchange(this.getContext(), this.getBinding());
+ } else {
+ return new JbiExchange(this.getContext(), this.getBinding(), this.getMessageExchange());
+ }
}
@Override
@@ -197,4 +198,11 @@
}
}
+ public MessageExchange detach() {
+ try {
+ return messageExchange;
+ } finally {
+ messageExchange = null;
+ }
+ }
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java Wed May 27 09:38:16 2009
@@ -27,6 +27,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.servicemix.jbi.helper.MessageExchangePattern;
+import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.messaging.FaultImpl;
import org.apache.servicemix.tck.mock.MockMessageExchange;
import org.apache.servicemix.tck.mock.MockNormalizedMessage;
@@ -87,6 +88,22 @@
assertSame(fault, exchange.getFaultMessage());
}
+ public void testDetach() throws Exception {
+ MessageExchange exchange = createMockExchange();
+ StringSource body = new StringSource("<question>Will this still be there?</question>");
+ exchange.setMessage(exchange.createMessage(), "in");
+ exchange.getMessage("in").setContent(body);
+ exchange.getMessage("in").setProperty("key", "value");
+ JbiExchange camelExchange = new JbiExchange(new DefaultCamelContext(), new JbiBinding(), exchange);
+ assertEquals(body, camelExchange.getIn().getBody());
+ // now detach the Camel Exchange from the underlying JBI MessageExchange
+ assertSame(exchange, camelExchange.detach());
+ assertNull(camelExchange.getMessageExchange());
+ // and make sure that all the data is still there
+ assertEquals(body, camelExchange.getIn().getBody());
+ assertEquals("value", camelExchange.getIn().getHeader("key"));
+ }
+
private MessageExchange createMockExchange() {
return new MockMessageExchange() {
@Override
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java Wed May 27 09:38:16 2009
@@ -25,12 +25,12 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.converter.jaxp.StringSource;
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.StringSource;
/**
* Tests on handling JBI InOnly exchanges by Camel
@@ -48,6 +48,20 @@
exchange.setService(new QName("urn:test", "in-only"));
exchange.getInMessage().setContent(new StringSource(MESSAGE));
client.send(exchange);
+ client.receive(1000);
+ assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+ done.assertIsSatisfied();
+ }
+
+ public void testInOnlyExchangeForwardAndConvertBody() throws Exception {
+ MockEndpoint done = getMockEndpoint("mock:done");
+ done.expectedBodiesReceived(MESSAGE);
+
+ ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+ InOnly exchange = client.createInOnlyExchange();
+ exchange.setService(new QName("urn:test", "forward"));
+ exchange.getInMessage().setContent(new StringSource(MESSAGE));
+ client.sendSync(exchange);
done.assertIsSatisfied();
}
@@ -69,6 +83,19 @@
client.sendSync(exchange);
assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
}
+
+ public void testInOnlyToAggregator() throws Exception {
+ ServiceMixClient smxClient = getServicemixClient();
+ getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+ for (int i = 0; i < 50; i++) {
+ InOnly exchange = smxClient.createInOnlyExchange();
+ exchange.setService(new QName("urn:test", "in-only-aggregator"));
+ exchange.getInMessage().setProperty("key", "aggregate-this");
+ exchange.getInMessage().setContent(new StringSource("<request>Could you please aggregate this?</request>"));
+ smxClient.send(exchange);
+ }
+ getMockEndpoint("mock:aggregated").assertIsSatisfied();
+ }
@Override
protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
@@ -81,14 +108,18 @@
@Override
public void configure() throws Exception {
+ from("jbi:service:urn:test:forward").to("jbi:service:urn:test:in-only?mep=in-only");
from("jbi:service:urn:test:in-only").convertBodyTo(String.class).to("mock:done");
from("jbi:service:urn:test:in-only-error").process(new Processor() {
public void process(Exchange exchange) throws Exception {
throw new Exception("Error");
}
});
+ from("jbi:service:urn:test:in-only-aggregator")
+ .aggregator(header("key"))
+ .setHeader("aggregated").constant(true)
+ .to("mock:aggregated");
}
-
};
}
}
Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java Wed May 27 09:38:16 2009
@@ -39,6 +39,8 @@
client.sendBody("direct:in-optional-out", new StringSource("<request>Does this MEP confuse you?</request>"));
inonly.assertIsSatisfied();
+ // let's wait for a moment to ensure that all pending Exchanges are handled
+ Thread.sleep(500);
}
public void testInOptionalOutFromJbiWithDone() throws Exception {