You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2010/06/29 23:24:41 UTC

svn commit: r959109 - in /servicemix/smx4/features/trunk: ./ camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/

Author: gertv
Date: Tue Jun 29 21:24:41 2010
New Revision: 959109

URL: http://svn.apache.org/viewvc?rev=959109&view=rev
Log:
SMX4-543: Improve Camel NMR component to leverage Camel async routing engine

Added:
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java
      - copied, changed from r953282, servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
Modified:
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
    servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
    servicemix/smx4/features/trunk/pom.xml

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixConsumer.java Tue Jun 29 21:24:41 2010
@@ -18,6 +18,7 @@ package org.apache.servicemix.camel.nmr;
 
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -29,7 +30,7 @@ import org.apache.servicemix.nmr.api.Sta
 import org.apache.servicemix.nmr.api.service.ServiceHelper;
 
 /**
- * A {@link Consumer} that receives Camel {@link org.apache.camel.Exchange}s and sends them into the ServiceMix NMR
+ * A {@link Consumer} that receives NMR {@link org.apache.servicemix.nmr.api.Exchange}s and invokes the Camel route
  */
 public class ServiceMixConsumer extends DefaultConsumer implements org.apache.servicemix.nmr.api.Endpoint, Synchronization {
 
@@ -64,13 +65,22 @@ public class ServiceMixConsumer extends 
         this.channel = channel;
     }
 
+    /**
+     * Process an NMR {@link org.apache.servicemix.nmr.api.Exchange} by creating and sending a Camel {@link org.apache.servicemix.nmr.api.Exchange}
+     * through the defined route
+     */
     public void process(Exchange exchange) {
     	if (exchange.getStatus() == Status.Active) {
             try {
             	org.apache.camel.Exchange camelExchange = getEndpoint().createExchange(exchange);
                 camelExchange.addOnCompletion(this);
 
-                getProcessor().process(camelExchange);
+                getAsyncProcessor().process(camelExchange, new AsyncCallback() {
+
+                    public void done(boolean doneSync) {
+                        // this is handled by the onComplete/onFailure method
+                    }
+                });
             } catch (Exception e) {
                 exchange.setError(e);
                 exchange.setStatus(Status.Error);
@@ -79,6 +89,10 @@ public class ServiceMixConsumer extends 
         }
     }
 
+    /**
+     * Handle the Camel {@link org.apache.camel.Exchange) response by updating the matching NMR {@link org.apache.servicemix.nmr.api.Exchange}
+     * and finishing the NMR MEP
+     */
     private void handleCamelResponse(Exchange exchange, org.apache.camel.Exchange camelExchange) {
         // just copy the camelExchange back to the nmr exchange
         exchange.getProperties().putAll(camelExchange.getProperties());
@@ -97,11 +111,17 @@ public class ServiceMixConsumer extends 
         channel.send(exchange);
     }
 
+    /*
+     * Handle a successfully completed Camel Exchange
+     */
     public void onComplete(org.apache.camel.Exchange exchange) {
         Exchange nmr = getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
         handleCamelResponse(nmr, exchange);
     }
 
+    /*
+     * Handle a Caml exchange failure
+     */
     public void onFailure(org.apache.camel.Exchange exchange) {
         Exchange nmr = getEndpoint().getComponent().getBinding().extractNmrExchange(exchange);
         handleCamelResponse(nmr, exchange);

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixEndpoint.java Tue Jun 29 21:24:41 2010
@@ -60,7 +60,7 @@ public class ServiceMixEndpoint extends 
     }
 
     public Producer createProducer() throws Exception {
-        return new ServiceMixProducer(this);
+        return new ServiceMixProducer(this, getComponent().getNmr());
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/main/java/org/apache/servicemix/camel/nmr/ServiceMixProducer.java Tue Jun 29 21:24:41 2010
@@ -16,80 +16,179 @@
  */
 package org.apache.servicemix.camel.nmr;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.Exchange;
-import org.apache.servicemix.nmr.api.Channel;
-import org.apache.servicemix.nmr.api.NMR;
-import org.apache.servicemix.nmr.api.Pattern;
-import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.*;
 import org.apache.servicemix.nmr.api.service.ServiceHelper;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
- * A {@link Producer} that receives exchanges from the ServiceMix NMR and sends {@link org.apache.camel.Exchange}s into the Camel route
+ * A {@link org.apache.camel.Producer} that handles incoming Camel exchanges by sending/receiving an NMR {@link org.apache.camel.Exchange}
  */
-public class ServiceMixProducer extends DefaultProducer {
-	
-    private static final String OPERATION_NAME = "operationName"; 
-    private Channel client;
+public class ServiceMixProducer extends DefaultProducer implements Endpoint, AsyncProcessor {
 
-    public ServiceMixProducer(ServiceMixEndpoint endpoint) {
-        super(endpoint);
-    }
+    private static final String TARGET_ENDPOINT_NAME = "TARGET_ENDPOINT_NAME";
 
-    public ServiceMixEndpoint getEndpoint() {
-        return (ServiceMixEndpoint) super.getEndpoint();
+    private final Map<String, Continuation> continuations = new ConcurrentHashMap<String, Continuation>();
+    private final NMR nmr;
+
+    private Channel channel;
+
+    public ServiceMixProducer(ServiceMixEndpoint endpoint, NMR nmr) {
+        super(endpoint);
+        this.nmr = nmr;
     }
 
+    /*
+     * Synchronously process the Camel exchange (using sendSync to send and receive the NMR Exchange)
+     */
     public void process(Exchange exchange) throws Exception {
-
         NMR nmr = getEndpoint().getComponent().getNmr();
 
-        org.apache.servicemix.nmr.api.Exchange e 
-        	= getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, client);
-            
+        org.apache.servicemix.nmr.api.Exchange e =
+            getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, channel);
+
         try {
             e.setTarget(nmr.getEndpointRegistry().lookup(
-                ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME, 
-                getEndpoint().getEndpointName())));
+                            ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+                                                    getEndpoint().getEndpointName())));
         } catch (Exception ex) {
             ex.printStackTrace();
         }
-                       
-        client.sendSync(e);
-                
-        handleResponse(exchange, client, e);
+
+        channel.sendSync(e);
+
+        handleResponse(exchange, channel, e);
+    }
+
+    /*
+     * Asynchronously process the Camel exchange (using send to send the NMR Exchange)
+     * (NMR responses will be handled by the {@link #process(org.apache.servicemix.nmr.api.Exchange)} method
+     */
+    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+        NMR nmr = getEndpoint().getComponent().getNmr();
+
+        org.apache.servicemix.nmr.api.Exchange e
+                = getEndpoint().getComponent().getBinding().populateNmrExchangeFromCamelExchange(exchange, channel);
+
+        try {
+            e.setTarget(nmr.getEndpointRegistry().lookup(
+                    ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+                            getEndpoint().getEndpointName())));
+
+            continuations.put(e.getId(), new Continuation(exchange, asyncCallback));
+            channel.send(e);
+
+            return false;
+        } catch (Exception ex) {
+            log.warn("Error occured while sending NMR exchange", ex);
+
+            continuations.remove(e.getId());
+
+            exchange.setException(ex);
+            asyncCallback.done(true);
+            return true;
+        }
+    }
+
+    /**
+     * Handle incoming NMR exchanges (responses to the exchanges sent in {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)}
+     */
+    public void process(org.apache.servicemix.nmr.api.Exchange exchange) {
+        Continuation continuation = continuations.remove(exchange.getId());
+
+        if (continuation == null) {
+            log.error("Unknown exchange received: " + exchange);
+        } else {
+            handleResponse(continuation.exchange, channel, exchange);
+            continuation.callback.done(false);
+        }
     }
 
-	private void handleResponse(Exchange exchange, Channel client,
-			org.apache.servicemix.nmr.api.Exchange e) {
-		if (e.getPattern() != Pattern.InOnly) {
+    /*
+     * Handle the NMR Exchange by:
+     * - updating the corresponding Camel Exchange
+     * - finishing the NMR Exchange MEP
+     */
+    private void handleResponse(Exchange exchange, Channel client, org.apache.servicemix.nmr.api.Exchange e) {
+        if (e.getPattern() != Pattern.InOnly) {
             if (e.getError() != null) {
                 exchange.setException(e.getError());
             } else {
-            	exchange.getProperties().putAll(e.getProperties());
-            	if (e.getFault().getBody() != null) {
+                exchange.getProperties().putAll(e.getProperties());
+                if (e.getFault().getBody() != null) {
                     exchange.getOut().setFault(true);
                     getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getFault(), exchange.getOut());
-            	} else {
-            		getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getOut(), exchange.getOut());
+                } else {
+                    getEndpoint().getComponent().getBinding().copyNmrMessageToCamelMessage(e.getOut(), exchange.getOut());
                 }
-            	e.setStatus(Status.Done);
-            	client.send(e);
+                e.setStatus(Status.Done);
+                channel.send(e);
             }
-            	
-    	}
-	}
+
+        }
+    }
 
     @Override
     protected void doStart() throws Exception {
-        NMR nmr = getEndpoint().getComponent().getNmr();
-        client = nmr.createChannel();
+        nmr.getEndpointRegistry().register(this, createEndpointMap());
+        super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
-        client.close();
-        client = null;
+        super.doStop();
+        nmr.getEndpointRegistry().unregister(this, createEndpointMap());
+    }
+
+    /**
+     * Access the matching {@link org.apache.servicemix.camel.nmr.ServiceMixEndpoint}
+     */
+    public ServiceMixEndpoint getEndpoint() {
+        return (ServiceMixEndpoint) super.getEndpoint();
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    /*
+     * Creates the default endpoint map, containing the endpoint name as well as a property referring to the
+     * target endpoint name
+     */
+    private Map<String,Object> createEndpointMap() {
+        return ServiceHelper.createMap(org.apache.servicemix.nmr.api.Endpoint.NAME,
+                ServiceMixProducer.class.getName() + "-" + UUID.randomUUID(),
+                TARGET_ENDPOINT_NAME,
+                getEndpoint().getEndpointName());
+
+    }
+
+    /*
+     * Access an unmodifiable copy of the pending continuations map
+     */
+    protected Map<String, Continuation> getContinuations() {
+        return Collections.unmodifiableMap(continuations);
+    }
+
+    /*
+     * Encapsulates all the information required to continue a Camel {@link Exchange} 
+     */
+    private final class Continuation {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        private Continuation(Exchange exchange, AsyncCallback callback) {
+            super();
+            this.exchange = exchange;
+            this.callback = callback;
+        }
     }
-    
 }

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/AbstractComponentTest.java Tue Jun 29 21:24:41 2010
@@ -22,11 +22,16 @@ import org.apache.servicemix.executors.E
 import org.apache.servicemix.executors.impl.ExecutorConfig;
 import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
 import org.apache.servicemix.nmr.api.Channel;
+import org.apache.servicemix.nmr.api.Endpoint;
 import org.apache.servicemix.nmr.api.Exchange;
 import org.apache.servicemix.nmr.api.event.ExchangeListener;
 import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.core.InternalEndpointWrapper;
 import org.apache.servicemix.nmr.core.ServiceMix;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * Abstract base class for building NMR component unit tests
  * - the NMR component is available with URI prefix nmr:
@@ -52,6 +57,35 @@ public abstract class AbstractComponentT
         super.setUp();
     }
 
+    @Override
+    protected void tearDown() throws Exception {
+        for (ServiceMixProducer producer : findEndpoints(ServiceMixProducer.class)) {
+            if (producer.getContinuations().size() > 0) {
+                // let's wait for a moment to give the last exchanges the time to get Done
+                Thread.sleep(500);
+            }
+            assertEquals("There should be no more pending Camel exchanges in the producer endpoints",
+                         0, producer.getContinuations().size());
+        }
+
+        nmr.shutdown();
+        super.tearDown();
+    }
+
+    private<E extends Endpoint> List<E> findEndpoints(Class<E> type) {
+        List result = new LinkedList();
+
+        for (Endpoint endpoint : nmr.getEndpointRegistry().getServices()) {
+            if (endpoint instanceof InternalEndpointWrapper) {
+                InternalEndpointWrapper wrapper = (InternalEndpointWrapper) endpoint;
+                if (type.isAssignableFrom(wrapper.getEndpoint().getClass())) {
+                    result.add(wrapper.getEndpoint());
+                }
+            }
+        }
+        return result;
+    }
+
     /*
      * Create the ExecutorFactory for the unit test
      * based on the default configuration used in ServiceMix 4

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/CamelAsyncRouteTest.java Tue Jun 29 21:24:41 2010
@@ -56,14 +56,15 @@ public class CamelAsyncRouteTest extends
 
         assertMockEndpointsSatisfied();
 
+        assertTrue("All NMR exchanges should have been marked DONE",
+                   done.await(20, TimeUnit.SECONDS));                
+        
         for (Exchange exchange : mock.getExchanges()) {
             Thread thread = exchange.getProperty(HANDLED_BY_THREAD, Thread.class);
-            assertTrue("onCompletion should have been called from the Camel 'threads' thread pool",
-                       thread.getName().contains("Camel") && thread.getName().contains("Threads"));
+            assertTrue("processor should have been called from the Camel 'threads' thread pool instead of " + thread.getName(),
+                       thread.getName().contains("Camel") && thread.getName().contains("Thread"));
         }
 
-        assertTrue("All NMR exchanges should have been marked DONE",
-                   done.await(20, TimeUnit.SECONDS));        
     }
 
     public void testCamelSeda() throws InterruptedException {       
@@ -88,12 +89,13 @@ public class CamelAsyncRouteTest extends
             public void configure() throws Exception {
                 from("direct:threads").to("mock:sent").to("nmr:threads");
                 from("nmr:threads")
-                    .onCompletion().process(new Processor() {
+                    .threads(5)
+                    .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             exchange.setProperty(HANDLED_BY_THREAD, Thread.currentThread());
                         }
                     })
-                    .threads(5).to("mock:threads");
+                    .to("mock:threads");
 
                 from("seda:seda?concurrentConsumers=10").to("mock:sent").to("nmr:seda");
                 from("nmr:seda").to("seda:seda-internal?waitForTaskToComplete=Never");

Copied: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java (from r953282, servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java)
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java?p2=servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java&p1=servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java&r1=953282&r2=959109&rev=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/MultipleProducersTest.java Tue Jun 29 21:24:41 2010
@@ -16,20 +16,42 @@
  */
 package org.apache.servicemix.camel.nmr;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
- * A very basic NMR test, just testing if the Exchange can flow through the NMR
- * from one Camel route to the next one
+ * Test case to ensure that the component can deal with multiple {@link org.apache.servicemix.camel.nmr.ServiceMixProducer}
+ * instances for the same endpoint name being used concurrently.
  */
-public class SimpleNmrTest extends AbstractComponentTest {
+public class MultipleProducersTest extends AbstractComponentTest {
+
+    private static final int COUNT = 100;
 
-    public void testSimpleExchange() throws InterruptedException {
-        MockEndpoint mock = getMockEndpoint("mock:simple");
-        mock.expectedBodiesReceived("Simple message body");
+    public void testConcurrentlyUsingTheSameProducerName() throws InterruptedException {
+        getMockEndpoint("mock:handler").expectedMessageCount(2 * COUNT);
 
-        template.sendBody("direct:simple", "Simple message body");
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+
+        for (int i = 0 ; i < 100 ; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    assertEquals("Replying to Guillaume",
+                                 template.requestBody("direct:a", "Guillaume"));
+                }
+            });
+            executor.execute(new Runnable() {
+                public void run() {
+                    assertEquals("Replying to Chris",
+                                 template.requestBody("direct:a", "Chris"));
+                }
+            });
+        }
 
         assertMockEndpointsSatisfied();
     }
@@ -40,9 +62,11 @@ public class SimpleNmrTest extends Abstr
 
             @Override
             public void configure() throws Exception {
-                from("direct:simple").to("nmr:simple");
-                from("nmr:simple").to("mock:simple");
+                from("direct:a").to("nmr:handler");
+                from("direct:b").to("nmr:handler");
+
+                from("nmr:handler").setBody(simple("Replying to ${body}")).to("mock:handler");
             }
         };
     }
-}
+}
\ No newline at end of file

Modified: servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java (original)
+++ servicemix/smx4/features/trunk/camel/servicemix-camel/src/test/java/org/apache/servicemix/camel/nmr/SimpleNmrTest.java Tue Jun 29 21:24:41 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.servicemix.camel.nmr;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -25,15 +27,60 @@ import org.apache.camel.component.mock.M
  */
 public class SimpleNmrTest extends AbstractComponentTest {
 
-    public void testSimpleExchange() throws InterruptedException {
+    private static final String REQUEST_MESSAGE = "Simple message body";
+    private static final String RESPONSE_MESSAGE = "Simple message reply";
+
+    public void testSimpleInOnly() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:simple");
+        mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+        template.sendBody("direct:simple", REQUEST_MESSAGE);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSimpleInOnlyWithMultipleHops() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:hops");
+        mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+        template.sendBody("direct:hops", REQUEST_MESSAGE);
+
+        assertMockEndpointsSatisfied();        
+    }
+
+    public void testSimpleInOut() throws InterruptedException {
         MockEndpoint mock = getMockEndpoint("mock:simple");
-        mock.expectedBodiesReceived("Simple message body");
+        mock.expectedBodiesReceived(REQUEST_MESSAGE);
 
-        template.sendBody("direct:simple", "Simple message body");
+        final String response = template.requestBody("direct:simple", REQUEST_MESSAGE, String.class);
 
         assertMockEndpointsSatisfied();
+        assertEquals("Receiving back the reply set by the second route",
+                     RESPONSE_MESSAGE, response);
     }
 
+    public void testSimpleInOutWithMultipleHops() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:hops");
+        mock.expectedBodiesReceived(REQUEST_MESSAGE);
+
+        final String response = template.requestBody("direct:hops", REQUEST_MESSAGE, String.class);
+
+        assertMockEndpointsSatisfied();
+        assertEquals("Receiving back the reply set by the second route",
+                     RESPONSE_MESSAGE, response);
+    }
+
+    public void testSimpleInvalidEndpoint() throws InterruptedException {
+        Exchange exchange = template.send("direct:error", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(REQUEST_MESSAGE);
+            }
+        });
+
+        assertTrue("Sending to an invalid NMR endpoint should have failed", exchange.isFailed());
+    }
+
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -41,7 +88,13 @@ public class SimpleNmrTest extends Abstr
             @Override
             public void configure() throws Exception {
                 from("direct:simple").to("nmr:simple");
-                from("nmr:simple").to("mock:simple");
+                from("nmr:simple").to("mock:simple").setBody(constant(RESPONSE_MESSAGE));
+
+                from("direct:hops").to("nmr:hop1");
+                from("nmr:hop1").to("nmr:hop2");
+                from("nmr:hop2").to("mock:hops").setBody(constant(RESPONSE_MESSAGE));
+
+                from("direct:error").to("nmr:invalid-endpoint-name");
             }
         };
     }

Modified: servicemix/smx4/features/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/smx4/features/trunk/pom.xml?rev=959109&r1=959108&r2=959109&view=diff
==============================================================================
--- servicemix/smx4/features/trunk/pom.xml (original)
+++ servicemix/smx4/features/trunk/pom.xml Tue Jun 29 21:24:41 2010
@@ -74,7 +74,7 @@
       <bcel.version>5.2_2</bcel.version>
       <bnd.version>0.0.384</bnd.version>
       <cglib.version>2.1_3_4</cglib.version>
-      <camel.version>2.2.0</camel.version>
+      <camel.version>2.4-SNAPSHOT</camel.version>
       <commons-management.version>1.0</commons-management.version>
       <commons-beanutils.version>1.7.0_3</commons-beanutils.version>
       <commons-collections.version>3.2.1</commons-collections.version>