You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/19 16:39:26 UTC

svn commit: r530429 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/mock/ main/java/org/apache/camel/processor/idempotent/ test/java/org/apache/camel/processor/

Author: jstrachan
Date: Thu Apr 19 07:39:21 2007
New Revision: 530429

URL: http://svn.apache.org/viewvc?view=rev&rev=530429
Log:
added more helper behaviour to the MockEndpoint for easier testing; also migrated the various processor tests to the new MockEndpoint approach to simplify testing

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Apr 19 07:39:21 2007
@@ -26,6 +26,8 @@
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -41,13 +43,44 @@
  * @version $Revision: 1.1 $
  */
 public class MockEndpoint extends DefaultEndpoint<Exchange> {
+    private static final transient Log log = LogFactory.getLog(MockEndpoint.class);
     private int receivedCounter;
     private int expectedCount = -1;
     private Map<Integer, Processor<Exchange>> processors = new HashMap<Integer, Processor<Exchange>>();
     private List<Exchange> exchangesReceived = new ArrayList<Exchange>();
     private List<Throwable> failures = new ArrayList<Throwable>();
+    private List<Runnable> tests = new ArrayList<Runnable>();
     private CountDownLatch latch = new CountDownLatch(1);
 
+    public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
+        // lets only wait on the first empty endpoint
+        int count = 0;
+        for (MockEndpoint endpoint : endpoints) {
+            if (endpoint.getExpectedCount() != 0) {
+                endpoint.assertIsSatisfied();
+                count++;
+            }
+        }
+
+        for (MockEndpoint endpoint : endpoints) {
+            if (endpoint.getExpectedCount() == 0) {
+                if (count == 0) {
+                    endpoint.assertIsSatisfied();
+                    count++;
+                }
+                else {
+                    endpoint.assertIsSatisfied(0);
+                }
+            }
+        }
+    }
+
+    public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
+        for (MockEndpoint endpoint : endpoints) {
+            endpoint.expectsMessageCount(count);
+        }
+    }
+
     public MockEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
     }
@@ -70,31 +103,41 @@
 
     // Testing API
     //-------------------------------------------------------------------------
+
+    /**
+     * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
+     */
     public void assertIsSatisfied() throws InterruptedException {
+        assertIsSatisfied(1000);
+    }
+
+    /**
+     * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
+     */
+    public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
         if (latch != null) {
             // now lets wait for the results
             latch.await(10, TimeUnit.SECONDS);
         }
         else if (expectedCount == 0) {
             // lets wait a little bit just in case
-            Thread.sleep(1000);
+            Thread.sleep(timeoutForEmptyEndpoints);
         }
 
         if (expectedCount >= 0) {
             assertEquals("Expected message count", expectedCount, receivedCounter);
         }
-    }
 
-    protected void assertEquals(String message, Object expectedValue, Object actualValue) {
-        if (!ObjectHelper.equals(expectedValue, actualValue)) {
-            throw new AssertionError(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
+        for (Runnable test : tests) {
+            test.run();
         }
     }
 
-    public void reset() {
-        receivedCounter = 0;
-    }
-
+    /**
+     * Specifies the expected number of message exchanges that should be received by this endpoint
+     *
+     * @param expectedCount the number of message exchanges that should be expected by this endpoint
+     */
     public void expectedMessageCount(int expectedCount) {
         this.expectedCount = expectedCount;
         if (expectedCount <= 0) {
@@ -105,6 +148,47 @@
         }
     }
 
+    /**
+     * Adds an expectation that the given body values are received by this endpoint
+     */
+    public void expectedBodiesReceived(final List bodies) {
+        expectedMessageCount(bodies.size());
+
+        expects(new Runnable() {
+            public void run() {
+                int counter = 0;
+                for (Object expectedBody : bodies) {
+                    Exchange exchange = getExchangesReceived().get(counter++);
+                    assertTrue("No exchange received for counter: " + counter, exchange != null);
+
+                    Object actualBody = exchange.getIn().getBody();
+
+                    assertEquals("Body of message: " + counter, expectedBody, actualBody);
+
+                    log.debug("Received message: " + counter + " with body: " + actualBody);
+                }
+            }
+        });
+    }
+
+    /**
+     * Adds the expection which will be invoked when enough messages are received
+     */
+    public void expects(Runnable runnable) {
+        tests.add(runnable);
+    }
+
+    /**
+     * Adds an expectation that the given body values are received by this endpoint
+     */
+    public void expectedBodiesReceived(Object... bodies) {
+        List bodyList = new ArrayList();
+        for (Object body : bodies) {
+            bodyList.add(body);
+        }
+        expectedBodiesReceived(bodyList);
+    }
+
     // Properties
     //-------------------------------------------------------------------------
     public List<Throwable> getFailures() {
@@ -119,6 +203,10 @@
         return exchangesReceived;
     }
 
+    public int getExpectedCount() {
+        return expectedCount;
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
     protected synchronized void onExchange(Exchange exchange) {
@@ -136,6 +224,18 @@
         }
         catch (Exception e) {
             failures.add(e);
+        }
+    }
+
+    protected void assertEquals(String message, Object expectedValue, Object actualValue) {
+        if (!ObjectHelper.equals(expectedValue, actualValue)) {
+            throw new AssertionError(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
+        }
+    }
+
+    protected void assertTrue(String message, boolean predicate) {
+        if (!predicate) {
+            throw new AssertionError(message);
         }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Thu Apr 19 07:39:21 2007
@@ -20,7 +20,9 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ExpressionHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -30,7 +32,7 @@
  *
  * @version $Revision: 1.1 $
  */
-public class IdempotentConsumer<E extends Exchange> implements Processor<E> {
+public class IdempotentConsumer<E extends Exchange> extends ServiceSupport implements Processor<E> {
     private static final transient Log log = LogFactory.getLog(IdempotentConsumer.class);
     private Expression<E> messageIdExpression;
     private Processor<E> nextProcessor;
@@ -72,6 +74,18 @@
 
     public Processor<E> getNextProcessor() {
         return nextProcessor;
+    }
+
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(nextProcessor);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(nextProcessor);
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChoiceTest.java Thu Apr 19 07:39:21 2007
@@ -16,92 +16,62 @@
  */
 package org.apache.camel.processor;
 
-import junit.framework.TestCase;
+import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.queue.QueueEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.util.ProducerCache;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 
 /**
  * @version $Revision: 1.1 $
  */
-public class ChoiceTest extends TestCase {
-    private static final transient Log log = LogFactory.getLog(ChoiceTest.class);
-
-    protected CamelContext container = new DefaultCamelContext();
-    protected CountDownLatch latch = new CountDownLatch(1);
-    protected Endpoint<Exchange> endpoint;
+public class ChoiceTest extends TestSupport {
+    protected CamelContext context = new DefaultCamelContext();
     protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
-    protected List<String> receivedBodies = new ArrayList<String>();
+    protected Endpoint<Exchange> startEndpoint;
+    protected MockEndpoint x, y, z;
 
     public void testSendToFirstWhen() throws Exception {
-        sendMessage("bar", "one");
-        waitForMessageInQueue("b");
-        assertQueueContains("b", "one");
-        assertQueueEmpty("c");
-        assertQueueEmpty("d");
-    }
+        x.expectedBodiesReceived("one");
+        expectsMessageCount(0, y, z);
 
-    public void testSendToSecondWhen() throws Exception {
-        sendMessage("cheese", "two");
-        waitForMessageInQueue("c");
-        assertQueueEmpty("b");
-        assertQueueContains("c", "two");
-        assertQueueEmpty("d");
-    }
+        sendMessage("bar", "one");
 
-    public void testSendToOtherwiseClause() throws Exception {
-        sendMessage("somethingUndefined", "two");
-        waitForMessageInQueue("d");
-        assertQueueEmpty("b");
-        assertQueueEmpty("c");
-        assertQueueContains("d", "two");
+        assertIsSatisfied(x, y, z);
     }
 
-    protected void assertQueueContains(String name, String expectedBody) {
-        QueueEndpoint endpoint = getQueue(name);
-        BlockingQueue queue = endpoint.getQueue();
-        assertEquals("Queue size for: " + name + " but was: " + queue, 1, queue.size());
+    public void testSendToSecondWhen() throws Exception {
+        y.expectedBodiesReceived("two");
+        expectsMessageCount(0, x, z);
 
-        Exchange exchange = (Exchange) queue.peek();
-        Object firstBody = exchange.getIn().getBody();
-        assertEquals("First body", expectedBody, firstBody);
-    }
+        sendMessage("cheese", "two");
 
-    protected void assertQueueEmpty(String name) {
-        QueueEndpoint queue = getQueue(name);
-        assertEquals("Queue size for: " + name, 0, queue.getQueue().size());
+        assertIsSatisfied(x, y, z);
     }
 
+    public void testSendToOtherwiseClause() throws Exception {
+        z.expectedBodiesReceived("three");
+        expectsMessageCount(0, x, y);
 
-    protected void waitForMessageInQueue(String name) throws InterruptedException {
-        // TODO we should replace with actual processors on each queue using a latch
-        QueueEndpoint endpoint = getQueue(name);
-        BlockingQueue queue = endpoint.getQueue();
-        for (int i = 0; i < 100 && queue.isEmpty(); i++) {
-            Thread.sleep(100);
-        }
-    }
+        sendMessage("somethingUndefined", "three");
 
-    protected QueueEndpoint getQueue(String name) {
-        return (QueueEndpoint) container.resolveEndpoint("queue:" + name);
+        assertIsSatisfied(x, y, z);
     }
 
-
     protected void sendMessage(final Object headerValue, final Object body) throws Exception {
-        client.send(endpoint, new Processor<Exchange>() {
+        client.send(startEndpoint, new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 // now lets fire in a message
                 Message in = exchange.getIn();
@@ -113,34 +83,24 @@
 
     @Override
     protected void setUp() throws Exception {
-        final Processor<Exchange> processor = new Processor<Exchange>() {
-            public void process(Exchange e) {
-                Message in = e.getIn();
-                String body = in.getBody(String.class);
-
-                log.debug("Received body: " + body + " on exchange: " + e);
+        context.addRoutes(createRouteBuilder());
 
-                receivedBodies.add(body);
-                latch.countDown();
-            }
-        };
-        final String endpointUri = "queue:a";
+        startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
 
-        // lets add some routes
-        container.addRoutes(createRouteBuilder(endpointUri, processor));
-        endpoint = container.resolveEndpoint(endpointUri);
-        assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
+        x = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:x");
+        y = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:y");
+        z = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:z");
 
-        container.start();
+        context.start();
     }
 
-    protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+    protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").choice()
-                        .when(header("foo").isEqualTo("bar")).to("queue:b")
-                        .when(header("foo").isEqualTo("cheese")).to("queue:c")
-                        .otherwise().to("queue:d");
+                        .when(header("foo").isEqualTo("bar")).to("mock:x")
+                        .when(header("foo").isEqualTo("cheese")).to("mock:y")
+                        .otherwise().to("mock:z");
             }
         };
     }
@@ -148,6 +108,6 @@
     @Override
     protected void tearDown() throws Exception {
         client.stop();
-        container.stop();
+        context.stop();
     }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java Thu Apr 19 07:39:21 2007
@@ -17,25 +17,16 @@
  */
 package org.apache.camel.processor;
 
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.Message;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
 import org.apache.camel.TestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.ProducerCache;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
+import org.apache.camel.util.ProducerCache;
 
 /**
  * @version $Revision: 1.1 $
@@ -67,7 +58,7 @@
         context.addRoutes(createRouteBuilder());
 
         startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
-        resultEndpoint = (MockEndpoint)resolveMandatoryEndpoint(context, "mock:result");
+        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:result");
 
         context.start();
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Thu Apr 19 07:39:21 2007
@@ -17,38 +17,37 @@
  */
 package org.apache.camel.processor;
 
-import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.ProducerCache;
 import org.apache.camel.impl.DefaultCamelContext;
+import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+import org.apache.camel.util.ProducerCache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * @version $Revision: 1.1 $
  */
-public class IdempotentConsumerTest extends TestCase {
-    private static final transient Log log = LogFactory.getLog(IdempotentConsumerTest.class);
-
+public class IdempotentConsumerTest extends TestSupport {
     protected CamelContext context;
-    protected CountDownLatch latch = new CountDownLatch(3);
-    protected Endpoint<Exchange> endpoint;
     protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
-    protected List<String> receivedBodies = new ArrayList<String>();
+
+    protected Endpoint<Exchange> startEndpoint;
+    protected MockEndpoint resultEndpoint;
 
     public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
         sendMessage("1", "one");
         sendMessage("2", "two");
         sendMessage("1", "one");
@@ -56,19 +55,11 @@
         sendMessage("1", "one");
         sendMessage("3", "three");
 
-        // lets wait on the message being received
-        boolean received = latch.await(20, TimeUnit.SECONDS);
-        assertTrue("Did not receive the message!", received);
-
-        assertEquals("Should have received 3 responses: " + receivedBodies, 3, receivedBodies.size());
-
-        assertEquals("received bodies", Arrays.asList(new Object[] { "one", "two", "three"}), receivedBodies);
-
-        log.debug("Received bodies: " + receivedBodies);
+        resultEndpoint.assertIsSatisfied();
     }
 
     protected void sendMessage(final Object messageId, final Object body) {
-        client.send(endpoint, new Processor<Exchange>() {
+        client.send(startEndpoint, new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 // now lets fire in a message
                 Message in = exchange.getIn();
@@ -82,23 +73,14 @@
     protected void setUp() throws Exception {
         context = createContext();
 
-        final Processor<Exchange> processor = new Processor<Exchange>() {
-            public void process(Exchange e) {
-                Message in = e.getIn();
-                String body = in.getBody(String.class);
-
-                log.debug("Received body: " + body + " on exchange: " + e);
-
-                receivedBodies.add(body);
-                latch.countDown();
-            }
-        };
-        final String endpointUri = "queue:test.a";
+        String fromUri = "queue:test.a";
+        String toUri = "mock:result";
 
         // lets add some routes
-        context.addRoutes(createRouteBuilder(endpointUri, processor));
-        endpoint = context.resolveEndpoint(endpointUri);
-        assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
+        context.addRoutes(createRouteBuilder(fromUri, toUri));
+
+        startEndpoint = resolveMandatoryEndpoint(context, fromUri);
+        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, toUri);
 
         context.start();
     }
@@ -107,10 +89,10 @@
         return new DefaultCamelContext();
     }
 
-    protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+    protected RouteBuilder createRouteBuilder(final String fromUri, final String toUri) {
         return new RouteBuilder() {
             public void configure() {
-                from(endpointUri).idempotentConsumer(header("messageId"), memoryMessageIdRepository()).process(processor);
+                from(fromUri).idempotentConsumer(header("messageId"), memoryMessageIdRepository()).to(toUri);
             }
         };
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java?view=diff&rev=530429&r1=530428&r2=530429
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/JoinRoutesTest.java Thu Apr 19 07:39:21 2007
@@ -16,52 +16,38 @@
  */
 package org.apache.camel.processor;
 
-import junit.framework.TestCase;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.TestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.util.ProducerCache;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @version $Revision: 1.1 $
  */
-public class JoinRoutesTest extends TestCase {
-    private static final transient Log log = LogFactory.getLog(JoinRoutesTest.class);
-
-    protected CamelContext container = new DefaultCamelContext();
-    protected CountDownLatch latch = new CountDownLatch(3);
-    protected Endpoint<Exchange> endpoint;
+public class JoinRoutesTest extends TestSupport {
+    protected CamelContext context = new DefaultCamelContext();
+    protected Endpoint<Exchange> startEndpoint;
+    protected MockEndpoint resultEndpoint;
     protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
-    protected List<String> receivedBodies = new ArrayList<String>();
 
     public void testMessagesThroughDifferentRoutes() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
         sendMessage("bar", "one");
         sendMessage("cheese", "two");
         sendMessage("somethingUndefined", "three");
 
-        // now lets wait for the results
-        latch.await(10, TimeUnit.SECONDS);
-
-        assertEquals("Number of receives: " + receivedBodies, 3, receivedBodies.size());
-        assertEquals("Received bodies", Arrays.asList(new Object[]{"one", "two", "three"}), receivedBodies);
-
-        log.debug("Received on queue:e the bodies: " + receivedBodies);
+        resultEndpoint.assertIsSatisfied();
     }
 
     protected void sendMessage(final Object headerValue, final Object body) throws Exception {
-        client.send(endpoint, new Processor<Exchange>() {
+        client.send(startEndpoint, new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 // now lets fire in a message
                 Message in = exchange.getIn();
@@ -73,28 +59,15 @@
 
     @Override
     protected void setUp() throws Exception {
-        final Processor<Exchange> processor = new Processor<Exchange>() {
-            public void process(Exchange e) {
-                Message in = e.getIn();
-                String body = in.getBody(String.class);
+        context.addRoutes(createRouteBuilder());
 
-                log.debug("Received body: " + body + " on exchange: " + e);
+        startEndpoint = resolveMandatoryEndpoint(context, "queue:a");
+        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint(context, "mock:result");
 
-                receivedBodies.add(body);
-                latch.countDown();
-            }
-        };
-        final String endpointUri = "queue:a";
-
-        // lets add some routes
-        container.addRoutes(createRouteBuilder(endpointUri, processor));
-        endpoint = container.resolveEndpoint(endpointUri);
-        assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
-
-        container.start();
+        context.start();
     }
 
-    protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+    protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").choice()
@@ -102,11 +75,9 @@
                         .when(header("foo").isEqualTo("cheese")).to("queue:c")
                         .otherwise().to("queue:d");
 
-                from("queue:b").to("queue:e");
-                from("queue:c").to("queue:e");
-                from("queue:d").to("queue:e");
-
-                from("queue:e").process(processor);
+                from("queue:b").to("mock:result");
+                from("queue:c").to("mock:result");
+                from("queue:d").to("mock:result");
             }
         };
     }
@@ -114,6 +85,6 @@
     @Override
     protected void tearDown() throws Exception {
         client.stop();
-        container.stop();
+        context.stop();
     }
 }