You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2010/06/29 23:24:41 UTC
svn commit: r959109 - in /servicemix/smx4/features/trunk: ./
camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/
camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/
Author: gertv
Date: Tue Jun 29 21:24:41 2010
New Revision: 959109
URL: http://svn.apache.org/viewvc?rev=959109&view=rev
Log:
SMX4-543: Improve Camel NMR component to leverage Camel async routing engine
Added:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java
- copied, changed from r953282, servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
Modified:
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
servicemix/smx4/features/trunk/pom.xml
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java Tue Jun 29 21:24:41 2010
@@ -18,6 +18,7 @@ package org.apache.servicemix.camel.nmr;
import java.util.Map;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -29,7 +30,7 @@ import org.apache.servicemix.nmr.api.Sta
import org.apache.servicemix.nmr.api.service.ServiceHelper;
/**
- * A {@link Consumer} that receives Camel {@link org.apache.camel.Exchange}s and sends them into the ServiceMix NMR
+ * A {@link Consumer} that receives NMR {@link org.apache.servicemix.nmr.api.Exchange}s and invokes the Camel route
*/
public class ServiceMixConsumer extends DefaultConsumer implements org.apache.servicemix.nmr.api.Endpoint, Synchronization {
@@ -64,13 +65,22 @@ public class ServiceMixConsumer extends
this.channel = channel;
}
+ /**
+ * Process an NMR {@link org.apache.servicemix.nmr.api.Exchange} by creating and sending a Camel {@link org.apache.servicemix.nmr.api.Exchange}
+ * through the defined route
+ */
public void process(Exchange exchange) {
if (exchange.getStatus() == Status.Active) {
try {
org.apache.camel.Exchange camelExchange = getEndpoint().createExchange(exchange);
camelExchange.addOnCompletion(this);
- getProcessor().process(camelExchange);
+ getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+
+ public void done(boolean doneSync) {
+ // this is handled by the onComplete/onFailure method
+ }
+ });
} catch (Exception e) {
exchange.setError(e);
exchange.setStatus(Status.Error);
@@ -79,6 +89,10 @@ public class ServiceMixConsumer extends
}
}
+ /**
+ * Handle the Camel {@link org.apache.camel.Exchange) response by updating the matching NMR {@link org.apache.servicemix.nmr.api.Exchange}
+ * and finishing the NMR MEP
+ */
private void handleCamelResponse(Exchange exchange, org.apache.camel.Exchange camelExchange) {
// just copy the camelExchange back to the nmr exchange
exchange.getProperties().putAll(camelExchange.getProperties());
@@ -97,11 +111,17 @@ public class ServiceMixConsumer extends
channel.send(exchange);
}
+ /*
+ * Handle a successfully completed Camel Exchange
+ */
public void onComplete(org.apache.camel.Exchange exchange) {
Exchange nmr = getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
handleCamelResponse(nmr, exchange);
}
+ /*
+ * Handle a Caml exchange failure
+ */
public void onFailure(org.apache.camel.Exchange exchange) {
Exchange nmr = getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
handleCamelResponse(nmr, exchange);
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java Tue Jun 29 21:24:41 2010
@@ -60,7 +60,7 @@ public class ServiceMixEndpoint extends
}
public Producer createProducer() throws Exception {
- return new ServiceMixProducer(this);
+ return new ServiceMixProducer(this, getComponent().getNmr());
}
public Consumer createConsumer(Processor processor) throws Exception {
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java Tue Jun 29 21:24:41 2010
@@ -16,80 +16,179 @@
*/
package org.apache.servicemix.camel.nmr;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.Exchange;
-import org.apache.servicemix.nmr.api.Channel;
-import org.apache.servicemix.nmr.api.NMR;
-import org.apache.servicemix.nmr.api.Pattern;
-import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.*;
import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
- * A {@link Producer} that receives exchanges from the ServiceMix NMR and sends {@link org.apache.camel.Exchange}s into the Camel route
+ * A {@link org.apache.camel.Producer} that handles incoming Camel exchanges by sending/receiving an NMR {@link org.apache.camel.Exchange}
*/
-public class ServiceMixProducer extends DefaultProducer {
-
- private static final String OPERATION_NAME = "operationName";
- private Channel client;
+public class ServiceMixProducer extends DefaultProducer implements Endpoint, AsyncProcessor {
- public ServiceMixProducer(ServiceMixEndpoint endpoint) {
- super(endpoint);
- }
+ private static final String TARGET_ENDPOINT_NAME = "TARGET_ENDPOINT_NAME";
- public ServiceMixEndpoint getEndpoint() {
- return (ServiceMixEndpoint) super.getEndpoint();
+ private final Map<String, Continuation> continuations = new ConcurrentHashMap<String, Continuation>();
+ private final NMR nmr;
+
+ private Channel channel;
+
+ public ServiceMixProducer(ServiceMixEndpoint endpoint, NMR nmr) {
+ super(endpoint);
+ this.nmr = nmr;
}
+ /*
+ * Synchronously process the Camel exchange (using sendSync to send and receive the NMR Exchange)
+ */
public void process(Exchange exchange) throws Exception {
-
NMR nmr = getEndpoint().getComponent().getNmr();
- org.apache.servicemix.nmr.api.Exchange e
- = getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, client);
-
+ org.apache.servicemix.nmr.api.Exchange e =
+ getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, channel);
+
try {
e.setTarget(nmr.getEndpointRegistry().lookup(
- ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
- getEndpoint().getEndpointName())));
+ ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+ getEndpoint().getEndpointName())));
} catch (Exception ex) {
ex.printStackTrace();
}
-
- client.sendSync(e);
-
- handleResponse(exchange, client, e);
+
+ channel.sendSync(e);
+
+ handleResponse(exchange, channel, e);
+ }
+
+ /*
+ * Asynchronously process the Camel exchange (using send to send the NMR Exchange)
+ * (NMR responses will be handled by the {@link #process(org.apache.servicemix.nmr.api.Exchange)} method
+ */
+ public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+ NMR nmr = getEndpoint().getComponent().getNmr();
+
+ org.apache.servicemix.nmr.api.Exchange e
+ = getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, channel);
+
+ try {
+ e.setTarget(nmr.getEndpointRegistry().lookup(
+ ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+ getEndpoint().getEndpointName())));
+
+ continuations.put(e.getId(), new Continuation(exchange, asyncCallback));
+ channel.send(e);
+
+ return false;
+ } catch (Exception ex) {
+ log.warn("Error occured while sending NMR exchange", ex);
+
+ continuations.remove(e.getId());
+
+ exchange.setException(ex);
+ asyncCallback.done(true);
+ return true;
+ }
+ }
+
+ /**
+ * Handle incoming NMR exchanges (responses to the exchanges sent in {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)}
+ */
+ public void process(org.apache.servicemix.nmr.api.Exchange exchange) {
+ Continuation continuation = continuations.remove(exchange.getId());
+
+ if (continuation == null) {
+ log.error("Unknown exchange received: " + exchange);
+ } else {
+ handleResponse(continuation.exchange, channel, exchange);
+ continuation.callback.done(false);
+ }
}
- private void handleResponse(Exchange exchange, Channel client,
- org.apache.servicemix.nmr.api.Exchange e) {
- if (e.getPattern() != Pattern.InOnly) {
+ /*
+ * Handle the NMR Exchange by:
+ * - updating the corresponding Camel Exchange
+ * - finishing the NMR Exchange MEP
+ */
+ private void handleResponse(Exchange exchange, Channel client, org.apache.servicemix.nmr.api.Exchange e) {
+ if (e.getPattern() != Pattern.InOnly) {
if (e.getError() != null) {
exchange.setException(e.getError());
} else {
- exchange.getProperties().putAll(e.getProperties());
- if (e.getFault().getBody() != null) {
+ exchange.getProperties().putAll(e.getProperties());
+ if (e.getFault().getBody() != null) {
exchange.getOut().setFault(true);
getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getFault(), exchange.getOut());
- } else {
- getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getOut(), exchange.getOut());
+ } else {
+ getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getOut(), exchange.getOut());
}
- e.setStatus(Status.Done);
- client.send(e);
+ e.setStatus(Status.Done);
+ channel.send(e);
}
-
- }
- }
+
+ }
+ }
@Override
protected void doStart() throws Exception {
- NMR nmr = getEndpoint().getComponent().getNmr();
- client = nmr.createChannel();
+ nmr.getEndpointRegistry().register(this, createEndpointMap());
+ super.doStart();
}
@Override
protected void doStop() throws Exception {
- client.close();
- client = null;
+ super.doStop();
+ nmr.getEndpointRegistry().unregister(this, createEndpointMap());
+ }
+
+ /**
+ * Access the matching {@link org.apache.servicemix.camel.nmr.ServiceMixEndpoint}
+ */
+ public ServiceMixEndpoint getEndpoint() {
+ return (ServiceMixEndpoint) super.getEndpoint();
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+ /*
+ * Creates the default endpoint map, containing the endpoint name as well as a property referring to the
+ * target endpoint name
+ */
+ private Map<String,Object> createEndpointMap() {
+ return ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+ ServiceMixProducer.class.getName() + "-" + UUID.randomUUID(),
+ TARGET_ENDPOINT_NAME,
+ getEndpoint().getEndpointName());
+
+ }
+
+ /*
+ * Access an unmodifiable copy of the pending continuations map
+ */
+ protected Map<String, Continuation> getContinuations() {
+ return Collections.unmodifiableMap(continuations);
+ }
+
+ /*
+ * Encapsulates all the information required to continue a Camel {@link Exchange}
+ */
+ private final class Continuation {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ private Continuation(Exchange exchange, AsyncCallback callback) {
+ super();
+ this.exchange = exchange;
+ this.callback = callback;
+ }
}
-
}
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java Tue Jun 29 21:24:41 2010
@@ -22,11 +22,16 @@ import org.apache.servicemix.executors.E
import org.apache.servicemix.executors.impl.ExecutorConfig;
import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Endpoint;
import org.apache.servicemix.nmr.api.Exchange;
import org.apache.servicemix.nmr.api.event.ExchangeListener;
import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.core.InternalEndpointWrapper;
import org.apache.servicemix.nmr.core.ServiceMix;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* Abstract base class for building NMR component unit tests
* - the NMR component is available with URI prefix nmr:
@@ -52,6 +57,35 @@ public abstract class AbstractComponentT
super.setUp();
}
+ @Override
+ protected void tearDown() throws Exception {
+ for (ServiceMixProducer producer : findEndpoints(ServiceMixProducer.class)) {
+ if (producer.getContinuations().size() > 0) {
+ // let's wait for a moment to give the last exchanges the time to get Done
+ Thread.sleep(500);
+ }
+ assertEquals("There should be no more pending Camel exchanges in the producer endpoints",
+ 0, producer.getContinuations().size());
+ }
+
+ nmr.shutdown();
+ super.tearDown();
+ }
+
+ private<E extends Endpoint> List<E> findEndpoints(Class<E> type) {
+ List result = new LinkedList();
+
+ for (Endpoint endpoint : nmr.getEndpointRegistry().getServices()) {
+ if (endpoint instanceof InternalEndpointWrapper) {
+ InternalEndpointWrapper wrapper = (InternalEndpointWrapper) endpoint;
+ if (type.isAssignableFrom(wrapper.getEndpoint().getClass())) {
+ result.add(wrapper.getEndpoint());
+ }
+ }
+ }
+ return result;
+ }
+
/*
* Create the ExecutorFactory for the unit test
* based on the default configuration used in ServiceMix 4
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java Tue Jun 29 21:24:41 2010
@@ -56,14 +56,15 @@ public class CamelAsyncRouteTest extends
assertMockEndpointsSatisfied();
+ assertTrue("All NMR exchanges should have been marked DONE",
+ done.await(20, TimeUnit.SECONDS));
+
for (Exchange exchange : mock.getExchanges()) {
Thread thread = exchange.getProperty(HANDLED_BY_THREAD, Thread.class);
- assertTrue("onCompletion should have been called from the Camel 'threads' thread pool",
- thread.getName().contains("Camel") && thread.getName().contains("Threads"));
+ assertTrue("processor should have been called from the Camel 'threads' thread pool instead of " + thread.getName(),
+ thread.getName().contains("Camel") && thread.getName().contains("Thread"));
}
- assertTrue("All NMR exchanges should have been marked DONE",
- done.await(20, TimeUnit.SECONDS));
}
public void testCamelSeda() throws InterruptedException {
@@ -88,12 +89,13 @@ public class CamelAsyncRouteTest extends
public void configure() throws Exception {
from("direct:threads").to("mock:sent").to("nmr:threads");
from("nmr:threads")
- .onCompletion().process(new Processor() {
+ .threads(5)
+ .process(new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.setProperty(HANDLED_BY_THREAD, Thread.currentThread());
}
})
- .threads(5).to("mock:threads");
+ .to("mock:threads");
from("seda:seda?concurrentConsumers=10").to("mock:sent").to("nmr:seda");
from("nmr:seda").to("seda:seda-internal?waitForTaskToComplete=Never");
Copied: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java (from r953282, servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java)
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java?p2=servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java&p1=servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java&r1=953282&r2=959109&rev=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java Tue Jun 29 21:24:41 2010
@@ -16,20 +16,42 @@
*/
package org.apache.servicemix.camel.nmr;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
- * A very basic NMR test, just testing if the Exchange can flow through the NMR
- * from one Camel route to the next one
+ * Test case to ensure that the component can deal with multiple {@link org.apache.servicemix.camel.nmr.ServiceMixProducer}
+ * instances for the same endpoint name being used concurrently.
*/
-public class SimpleNmrTest extends AbstractComponentTest {
+public class MultipleProducersTest extends AbstractComponentTest {
+
+ private static final int COUNT = 100;
- public void testSimpleExchange() throws InterruptedException {
- MockEndpoint mock = getMockEndpoint("mock:simple");
- mock.expectedBodiesReceived("Simple message body");
+ public void testConcurrentlyUsingTheSameProducerName() throws InterruptedException {
+ getMockEndpoint("mock:handler").expectedMessageCount(2 * COUNT);
- template.sendBody("direct:simple", "Simple message body");
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ for (int i = 0 ; i < 100 ; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ assertEquals("Replying to Guillaume",
+ template.requestBody("direct:a", "Guillaume"));
+ }
+ });
+ executor.execute(new Runnable() {
+ public void run() {
+ assertEquals("Replying to Chris",
+ template.requestBody("direct:a", "Chris"));
+ }
+ });
+ }
assertMockEndpointsSatisfied();
}
@@ -40,9 +62,11 @@ public class SimpleNmrTest extends Abstr
@Override
public void configure() throws Exception {
- from("direct:simple").to("nmr:simple");
- from("nmr:simple").to("mock:simple");
+ from("direct:a").to("nmr:handler");
+ from("direct:b").to("nmr:handler");
+
+ from("nmr:handler").setBody(simple("Replying to ${body}")).to("mock:handler");
}
};
}
-}
+}
\ No newline at end of file
Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java Tue Jun 29 21:24:41 2010
@@ -16,6 +16,8 @@
*/
package org.apache.servicemix.camel.nmr;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -25,15 +27,60 @@ import org.apache.camel.component.mock.M
*/
public class SimpleNmrTest extends AbstractComponentTest {
- public void testSimpleExchange() throws InterruptedException {
+ private static final String REQUEST_MESSAGE = "Simple message body";
+ private static final String RESPONSE_MESSAGE = "Simple message reply";
+
+ public void testSimpleInOnly() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:simple");
+ mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+ template.sendBody("direct:simple", REQUEST_MESSAGE);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSimpleInOnlyWithMultipleHops() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:hops");
+ mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+ template.sendBody("direct:hops", REQUEST_MESSAGE);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSimpleInOut() throws InterruptedException {
MockEndpoint mock = getMockEndpoint("mock:simple");
- mock.expectedBodiesReceived("Simple message body");
+ mock.expectedBodiesReceived(REQUEST_MESSAGE);
- template.sendBody("direct:simple", "Simple message body");
+ final String response = template.requestBody("direct:simple", REQUEST_MESSAGE, String.class);
assertMockEndpointsSatisfied();
+ assertEquals("Receiving back the reply set by the second route",
+ RESPONSE_MESSAGE, response);
}
+ public void testSimpleInOutWithMultipleHops() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:hops");
+ mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+ final String response = template.requestBody("direct:hops", REQUEST_MESSAGE, String.class);
+
+ assertMockEndpointsSatisfied();
+ assertEquals("Receiving back the reply set by the second route",
+ RESPONSE_MESSAGE, response);
+ }
+
+ public void testSimpleInvalidEndpoint() throws InterruptedException {
+ Exchange exchange = template.send("direct:error", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(REQUEST_MESSAGE);
+ }
+ });
+
+ assertTrue("Sending to an invalid NMR endpoint should have failed", exchange.isFailed());
+ }
+
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -41,7 +88,13 @@ public class SimpleNmrTest extends Abstr
@Override
public void configure() throws Exception {
from("direct:simple").to("nmr:simple");
- from("nmr:simple").to("mock:simple");
+ from("nmr:simple").to("mock:simple").setBody(constant(RESPONSE_MESSAGE));
+
+ from("direct:hops").to("nmr:hop1");
+ from("nmr:hop1").to("nmr:hop2");
+ from("nmr:hop2").to("mock:hops").setBody(constant(RESPONSE_MESSAGE));
+
+ from("direct:error").to("nmr:invalid-endpoint-name");
}
};
}
Modified: servicemix/smx4/features/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/pom.xml?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/pom.xml (original)
+++ servicemix/smx4/features/trunk/pom.xml Tue Jun 29 21:24:41 2010
@@ -74,7 +74,7 @@
<bcel.version>5.2_2</bcel.version>
<bnd.version>0.0.384</bnd.version>
<cglib.version>2.1_3_4</cglib.version>
- <camel.version>2.2.0</camel.version>
+ <camel.version>2.4-SNAPSHOT</camel.version>
<commons-management.version>1.0</commons-management.version>
<commons-beanutils.version>1.7.0_3</commons-beanutils.version>
<commons-collections.version>3.2.1</commons-collections.version>