You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by su...@apache.org on 2012/09/25 00:05:41 UTC

svn commit: r1389620 - in /camel/trunk/components/camel-sjms/src: main/java/org/apache/camel/component/sjms/ test/java/org/apache/camel/component/sjms/tx/

Author: sully6768
Date: Mon Sep 24 22:05:40 2012
New Revision: 1389620

URL: http://svn.apache.org/viewvc?rev=1389620&view=rev
Log:
Document clean up and test fix for BatchTransactions

Modified:
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java Mon Sep 24 22:05:40 2012
@@ -20,7 +20,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A {@link List} of these objects can be used to batch a collection of body and
+ * A {@link List} of these objects can be used to batch a collection of bodies and
  * header pairs in one exchange.
  * 
  */

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Mon Sep 24 22:05:40 2012
@@ -44,7 +44,7 @@ public class SjmsComponent extends Defau
     private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
     private KeyFormatStrategy keyFormatStrategy;
     private Integer maxConnections = 1;
-    private TransactionCommitStrategy commitStrategy = new DefaultTransactionCommitStrategy();
+    private TransactionCommitStrategy transactionCommitStrategy = new DefaultTransactionCommitStrategy();
 
     /**
      * @see
@@ -204,23 +204,23 @@ public class SjmsComponent extends Defau
     }
 
     /**
-     * Gets the TransactionCommitStrategy value of commitStrategy for this
+     * Gets the TransactionCommitStrategy value of transactionCommitStrategy for this
      * instance of SjmsComponent.
      * 
-     * @return the commitStrategy
+     * @return the transactionCommitStrategy
      */
-    public TransactionCommitStrategy getCommitStrategy() {
-        return commitStrategy;
+    public TransactionCommitStrategy getTransactionCommitStrategy() {
+        return transactionCommitStrategy;
     }
 
     /**
-     * Sets the TransactionCommitStrategy value of commitStrategy for this
+     * Sets the TransactionCommitStrategy value of transactionCommitStrategy for this
      * instance of SjmsComponent.
      * 
-     * @param commitStrategy Sets TransactionCommitStrategy, default is TODO add
+     * @param transactionCommitStrategy Sets TransactionCommitStrategy, default is TODO add
      *            default
      */
-    public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
-        this.commitStrategy = commitStrategy;
+    public void setTransactionCommitStrategy(TransactionCommitStrategy commitStrategy) {
+        this.transactionCommitStrategy = commitStrategy;
     }
 }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java Mon Sep 24 22:05:40 2012
@@ -26,18 +26,24 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Test;
 
 /**
- * Verify the ability to batch transactions.
+ * Verify the ability to batch transactions to the consumer.
  *
  */
 public class BatchTransactedQueueConsumerTest extends CamelTestSupport {
-    
+
     /**
-     * Verify that messages are being redelivered
+     * Verify that after only sending 10 messages that 10 are delivered to the
+     * processor and upon the 10th message throwing an Exception which causes
+     * the messages deliveries to be rolled back. The messages should then be
+     * redelivered with the JMSRedelivered flag set to true for a total of 10
+     * delivered messages.
+     * 
      * @throws Exception
      */
     @Test
     public void testEndpointConfiguredBatchTransaction() throws Exception {
-        // We should get two sets of 10 messages.  10 before the rollback and 10 after the rollback.
+        // We should get two sets of 10 messages. 10 before the rollback and 10
+        // after the rollback.
         getMockEndpoint("mock:test.before").expectedMessageCount(10);
         getMockEndpoint("mock:test.after").expectedMessageCount(10);
 
@@ -88,7 +94,7 @@ public class BatchTransactedQueueConsume
                                         String body = exchange.getIn().getBody(String.class);
                                         
                                         // If the message ends with 10, throw the exception
-                                        if (body.endsWith("10")) {
+                                        if (body.endsWith("4") || body.endsWith("6")) {
                                             log.info("10th message received.  Rolling back.");
                                             exchange.getOut().setFault(true);
                                             exchange.getOut().setBody("10th message received.  Rolling back.");

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java Mon Sep 24 22:05:40 2012
@@ -21,6 +21,8 @@ import java.util.List;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -29,15 +31,30 @@ import org.apache.camel.component.sjms.S
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
+/**
+ * Test used to verify the batch transaction capability of the SJMS Component
+ * for a Queue Producer.
+ */
 public class BatchTransactedQueueProducerTest extends CamelTestSupport {
 
     @Produce
     protected ProducerTemplate template;
 
+    /**
+     * Verify that after processing a {@link BatchMessage} twice with 30
+     * messages in for a total of 60 delivery attempts that we only see 30
+     * messages end up at the final consumer. This is due to an exception being
+     * thrown during the processing of the first 30 messages which causes a
+     * redelivery.
+     * 
+     * @throws Exception
+     */
     @Test
     public void testEndpointConfiguredBatchTransaction() throws Exception {
-        // We should see the World message twice, once for the exception
+        // We should see the BatchMessage once in the prebatch and once in the
+        // redelivery. Then we should see 30 messages arrive in the postbatch.
         getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
+        getMockEndpoint("mock:test.redelivery").expectedMessageCount(1);
         getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
 
         List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
@@ -48,8 +65,7 @@ public class BatchTransactedQueueProduce
         }
         template.sendBody("direct:start", messages);
 
-        getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
-        getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+        assertMockEndpointsSatisfied();
 
     }
 
@@ -68,9 +84,37 @@ public class BatchTransactedQueueProduce
         return new RouteBuilder() {
             @Override
             public void configure() {
+                onException(Exception.class)
+                    .handled(true)
+                    .setHeader("redeliveryAttempt")
+                        .constant("1")
+                    .log("Redelivery attempt 1")
+                    .to("mock:test.redelivery")
+                    .to("direct:start");
+                
                 from("direct:start")
                     .to("log:test-before?showAll=true")
                     .to("sjms:queue:batch.queue?transacted=true")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // This will force an exception to occur on the exchange
+                            // which will invoke our onException handler to
+                            // redeliver our batch message
+                            
+                            // Get the redelivery header 
+                            String redeliveryAttempt = exchange.getIn().getHeader("redeliveryAttempt", String.class);
+                            
+                            // Verify that it isn't empty
+                            // if it is do nothing and force the Exception
+                            if(redeliveryAttempt != null && redeliveryAttempt.equals("1")) {
+                                // do nothing and allow it to proceed
+                            } else {
+                                log.info("BatchMessage received without redelivery. Rolling back.");
+                                exchange.setException(new Exception());
+                            }
+                        }
+                    })
                     .to("mock:test.prebatch");
 
                 from("sjms:queue:batch.queue")

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java Mon Sep 24 22:05:40 2012
@@ -26,28 +26,38 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Test;
 
 /**
- * Verify the ability to batch transactions.
+ * Verify the ability to batch transactions to the consumer.
  *
  */
 public class BatchTransactedTopicConsumerTest extends CamelTestSupport {
-    
+
+
     /**
-     * Verify that messages are being redelivered
+     * Verify that after only sending 10 messages that 10 are delivered to the
+     * processor and upon the 10th message throwing an Exception which causes
+     * the messages deliveries to be rolled back. The messages should then be
+     * redelivered with the JMSRedelivered flag set to true for a total of 20
+     * delivered messages, 10 to each topic.
+     * 
      * @throws Exception
      */
     @Test
     public void testEndpointConfiguredBatchTransaction() throws Exception {
         // We should get two sets of 10 messages.  10 before the rollback and 10 after the rollback.
-        getMockEndpoint("mock:test.before").expectedMessageCount(10);
-        getMockEndpoint("mock:test.after").expectedMessageCount(10);
+        getMockEndpoint("mock:test.before.1").expectedMessageCount(10);
+        getMockEndpoint("mock:test.before.2").expectedMessageCount(10);
+        getMockEndpoint("mock:test.after.1").expectedMinimumMessageCount(10);
+        getMockEndpoint("mock:test.after.2").expectedMessageCount(10);
 
         // Send only 10 messages
         for (int i = 1; i <= 10; i++) {
             template.sendBody("direct:start", "Hello World " + i);
         }
 
-        getMockEndpoint("mock:test.before").assertIsSatisfied();
-        getMockEndpoint("mock:test.after").assertIsSatisfied();
+        getMockEndpoint("mock:test.before.1").assertIsSatisfied();
+        getMockEndpoint("mock:test.before.2").assertIsSatisfied();
+        getMockEndpoint("mock:test.after.1").assertIsSatisfied();
+        getMockEndpoint("mock:test.after.2").assertIsSatisfied();
 
     }
 
@@ -58,6 +68,7 @@ public class BatchTransactedTopicConsume
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
         SjmsComponent component = new SjmsComponent();
         component.setConnectionFactory(connectionFactory);
+        component.setMaxConnections(1);
         camelContext.addComponent("sjms", component);
 
         return camelContext;
@@ -78,8 +89,8 @@ public class BatchTransactedTopicConsume
                     // first consume all the messages that are not redelivered
                     .choice()
                         .when(header("JMSRedelivered").isEqualTo("false"))
-                            .to("log:before_log?showAll=true")
-                            .to("mock:test.before")
+                            .log("Consumer 1 Message Before Received: ${body}")
+                            .to("mock:test.before.1")
                             // This is where we will cause the rollback after 10 messages have been sent.
                             .process(new Processor() {
                                     @Override
@@ -87,17 +98,47 @@ public class BatchTransactedTopicConsume
                                         // Get the body
                                         String body = exchange.getIn().getBody(String.class);
                                         
-                                        // If the message ends with 10, throw the exception
-                                        if (body.endsWith("10")) {
-                                            log.info("10th message received.  Rolling back.");
+                                        // Try failing in two places to
+                                        // ensure we still get the number of messages that
+                                        // we expect across the topics
+                                        if (body.endsWith("6")) {
+                                            log.info("5th message received.  Rolling back.");
+                                            exchange.getOut().setFault(true);
+                                            exchange.getOut().setBody("5th message received.  Rolling back.");
+                                        }
+                                    }
+                                })
+                        .otherwise()
+                            .log("Consumer 1 Message After Received: ${body}")
+                            .to("mock:test.after.1");
+
+                // Our test consumer route
+                from("sjms:topic:transacted.consumer.test?transacted=true&transactionBatchCount=10")
+                    // first consume all the messages that are not redelivered
+                    .choice()
+                        .when(header("JMSRedelivered").isEqualTo("false"))
+                            .log("Consumer 2 Message Before Received: ${body}")
+                            .to("mock:test.before.2")
+                            // This is where we will cause the rollback after 10 messages have been sent.
+                            .process(new Processor() {
+                                    @Override
+                                    public void process(Exchange exchange) throws Exception {
+                                        // Get the body
+                                        String body = exchange.getIn().getBody(String.class);
+
+                                        // Try failing in two places to
+                                        // ensure we still get the number of messages that
+                                        // we expect across the topics
+                                        if (body.endsWith("3") || body.endsWith("7")) {
+                                            log.info("5th message received.  Rolling back.");
                                             exchange.getOut().setFault(true);
-                                            exchange.getOut().setBody("10th message received.  Rolling back.");
+                                            exchange.getOut().setBody("5th message received.  Rolling back.");
                                         }
                                     }
                                 })
                         .otherwise()
-                            .to("log:after_log?showAll=true")
-                            .to("mock:test.after");
+                            .log("Consumer 2 Message After Received: ${body}")
+                            .to("mock:test.after.2");
             }
         };
     }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java Mon Sep 24 22:05:40 2012
@@ -21,6 +21,8 @@ import java.util.List;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -29,16 +31,32 @@ import org.apache.camel.component.sjms.S
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
+/**
+ * Test used to verify the batch transaction capability of the SJMS Component
+ * for a Topic Producer.
+ */
 public class BatchTransactedTopicProducerTest extends CamelTestSupport {
 
     @Produce
     protected ProducerTemplate template;
 
+    /**
+     * Verify that after processing a {@link BatchMessage} twice with 30
+     * messages in for a total of 120 delivery attempts that we only see 60
+     * messages end up at the final consumers. This is due to an exception being
+     * thrown during the processing of the first 30 messages which causes a
+     * redelivery.
+     * 
+     * @throws Exception
+     */
     @Test
     public void testEndpointConfiguredBatchTransaction() throws Exception {
-        // We should see the World message twice, once for the exception
+        // We should see the BatchMessage once in the prebatch and once in the
+        // redelivery. Then we should see 30 messages arrive in the postbatch.
         getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
-        getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
+        getMockEndpoint("mock:test.redelivery").expectedMessageCount(1);
+        getMockEndpoint("mock:test.postbatch.1").expectedMessageCount(30);
+        getMockEndpoint("mock:test.postbatch.2").expectedMessageCount(30);
 
         List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
         for (int i = 1; i <= 30; i++) {
@@ -48,8 +66,7 @@ public class BatchTransactedTopicProduce
         }
         template.sendBody("direct:start", messages);
 
-        getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
-        getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+        assertMockEndpointsSatisfied();
 
     }
 
@@ -68,14 +85,47 @@ public class BatchTransactedTopicProduce
         return new RouteBuilder() {
             @Override
             public void configure() {
+                // Used to force the redelivery of the messages to the producer
+                onException(Exception.class)
+                    .handled(true)
+                    .setHeader("redeliveryAttempt")
+                        .constant("1")
+                    .log("Redelivery attempt 1")
+                    .to("mock:test.redelivery")
+                    .to("direct:start");
+                
                 from("direct:start")
                     .to("log:test-before?showAll=true")
                     .to("sjms:topic:batch.topic?transacted=true")
+                                        .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // This will force an exception to occur on the exchange
+                            // which will invoke our onException handler to
+                            // redeliver our batch message
+                            
+                            // Get the redelivery header 
+                            String redeliveryAttempt = exchange.getIn().getHeader("redeliveryAttempt", String.class);
+                            
+                            // Verify that it isn't empty
+                            // if it is do nothing and force the Exception
+                            if(redeliveryAttempt != null && redeliveryAttempt.equals("1")) {
+                                // do nothing and allow it to proceed
+                            } else {
+                                log.info("BatchMessage received without redelivery. Rolling back.");
+                                exchange.setException(new Exception());
+                            }
+                        }
+                    })
                     .to("mock:test.prebatch");
             
                 from("sjms:topic:batch.topic")
                     .to("log:test-after?showAll=true")
-                    .to("mock:test.postbatch");
+                    .to("mock:test.postbatch.1");
+                
+                from("sjms:topic:batch.topic")
+                    .to("log:test-after?showAll=true")
+                    .to("mock:test.postbatch.2");
             }
         };
     }