You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/18 20:51:38 UTC

svn commit: r530128 - in /activemq/camel/trunk/camel-core/src/test/java/org/apache/camel: builder/IdempotentConsumerTest.java processor/ processor/IdempotentConsumerTest.java

Author: jstrachan
Date: Wed Apr 18 11:51:37 2007
New Revision: 530128

URL: http://svn.apache.org/viewvc?view=rev&rev=530128
Log:
added a functional test case for the IdempotentConsumer

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java   (contents, props changed)
      - copied, changed from r530102, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
Removed:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (from r530102, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?view=diff&rev=530128&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java&r1=530102&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r2=530128
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Wed Apr 18 11:51:37 2007
@@ -15,10 +15,103 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.builder;
+package org.apache.camel.processor;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ProducerCache;
+import org.apache.camel.impl.DefaultCamelContext;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
  * @version $Revision: 1.1 $
  */
-public class IdempotentConsumerTest {
+public class IdempotentConsumerTest extends TestCase {
+    private static final transient Log log = LogFactory.getLog(IdempotentConsumerTest.class);
+
+    protected CamelContext container = new DefaultCamelContext();
+    protected CountDownLatch latch = new CountDownLatch(3);
+    protected Endpoint<Exchange> endpoint;
+    protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
+    protected List<String> receivedBodies = new ArrayList<String>();
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        // lets wait on the message being received
+        boolean received = latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Did not receive the message!", received);
+
+        assertEquals("Should have received 3 responses: " + receivedBodies, 3, receivedBodies.size());
+
+        assertEquals("received bodies", Arrays.asList(new Object[] { "one", "two", "three"}), receivedBodies);
+
+        log.debug("Received bodies: " + receivedBodies);
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        client.send(endpoint, new Processor<Exchange>() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        final Processor<Exchange> processor = new Processor<Exchange>() {
+            public void process(Exchange e) {
+                Message in = e.getIn();
+                String body = in.getBody(String.class);
+
+                log.debug("Received body: " + body + " on exchange: " + e);
+
+                receivedBodies.add(body);
+                latch.countDown();
+            }
+        };
+        final String endpointUri = "queue:test.a";
+
+        // lets add some routes
+        container.addRoutes(createRouteBuilder(endpointUri, processor));
+        endpoint = container.resolveEndpoint(endpointUri);
+        assertNotNull("No endpoint found for URI: " + endpointUri, endpoint);
+
+        container.start();
+    }
+
+    protected RouteBuilder createRouteBuilder(final String endpointUri, final Processor<Exchange> processor) {
+        return new RouteBuilder() {
+            public void configure() {
+                from(endpointUri).idempotentConsumer(header("messageId"), MemoryMessageIdRepository.memoryMessageIdRepository()).process(processor);
+            }
+        };
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        client.stop();
+        container.stop();
+    }
 }

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native