You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by su...@apache.org on 2012/09/25 00:05:41 UTC
svn commit: r1389620 - in /camel/trunk/components/camel-sjms/src:
main/java/org/apache/camel/component/sjms/
test/java/org/apache/camel/component/sjms/tx/
Author: sully6768
Date: Mon Sep 24 22:05:40 2012
New Revision: 1389620
URL: http://svn.apache.org/viewvc?rev=1389620&view=rev
Log:
Document clean up and test fix for BatchTransactions
Modified:
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java Mon Sep 24 22:05:40 2012
@@ -20,7 +20,7 @@ import java.util.List;
import java.util.Map;
/**
- * A {@link List} of these objects can be used to batch a collection of body and
+ * A {@link List} of these objects can be used to batch a collection of bodies and
* header pairs in one exchange.
*
*/
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Mon Sep 24 22:05:40 2012
@@ -44,7 +44,7 @@ public class SjmsComponent extends Defau
private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
private KeyFormatStrategy keyFormatStrategy;
private Integer maxConnections = 1;
- private TransactionCommitStrategy commitStrategy = new DefaultTransactionCommitStrategy();
+ private TransactionCommitStrategy transactionCommitStrategy = new DefaultTransactionCommitStrategy();
/**
* @see
@@ -204,23 +204,23 @@ public class SjmsComponent extends Defau
}
/**
- * Gets the TransactionCommitStrategy value of commitStrategy for this
+ * Gets the TransactionCommitStrategy value of transactionCommitStrategy for this
* instance of SjmsComponent.
*
- * @return the commitStrategy
+ * @return the transactionCommitStrategy
*/
- public TransactionCommitStrategy getCommitStrategy() {
- return commitStrategy;
+ public TransactionCommitStrategy getTransactionCommitStrategy() {
+ return transactionCommitStrategy;
}
/**
- * Sets the TransactionCommitStrategy value of commitStrategy for this
+ * Sets the TransactionCommitStrategy value of transactionCommitStrategy for this
* instance of SjmsComponent.
*
- * @param commitStrategy Sets TransactionCommitStrategy, default is TODO add
+ * @param transactionCommitStrategy Sets TransactionCommitStrategy, default is TODO add
* default
*/
- public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
- this.commitStrategy = commitStrategy;
+ public void setTransactionCommitStrategy(TransactionCommitStrategy commitStrategy) {
+ this.transactionCommitStrategy = commitStrategy;
}
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java Mon Sep 24 22:05:40 2012
@@ -26,18 +26,24 @@ import org.apache.camel.test.junit4.Came
import org.junit.Test;
/**
- * Verify the ability to batch transactions.
+ * Verify the ability to batch transactions to the consumer.
*
*/
public class BatchTransactedQueueConsumerTest extends CamelTestSupport {
-
+
/**
- * Verify that messages are being redelivered
+ * Verify that after only sending 10 messages that 10 are delivered to the
+ * processor and upon the 10th message throwing an Exception which causes
+ * the messages deliveries to be rolled back. The messages should then be
+ * redelivered with the JMSRedelivered flag set to true for a total of 10
+ * delivered messages.
+ *
* @throws Exception
*/
@Test
public void testEndpointConfiguredBatchTransaction() throws Exception {
- // We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback.
+ // We should get two sets of 10 messages. 10 before the rollback and 10
+ // after the rollback.
getMockEndpoint("mock:test.before").expectedMessageCount(10);
getMockEndpoint("mock:test.after").expectedMessageCount(10);
@@ -88,7 +94,7 @@ public class BatchTransactedQueueConsume
String body = exchange.getIn().getBody(String.class);
// If the message ends with 10, throw the exception
- if (body.endsWith("10")) {
+ if (body.endsWith("4") || body.endsWith("6")) {
log.info("10th message received. Rolling back.");
exchange.getOut().setFault(true);
exchange.getOut().setBody("10th message received. Rolling back.");
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java Mon Sep 24 22:05:40 2012
@@ -21,6 +21,8 @@ import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
@@ -29,15 +31,30 @@ import org.apache.camel.component.sjms.S
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
+/**
+ * Test used to verify the batch transaction capability of the SJMS Component
+ * for a Queue Producer.
+ */
public class BatchTransactedQueueProducerTest extends CamelTestSupport {
@Produce
protected ProducerTemplate template;
+ /**
+ * Verify that after processing a {@link BatchMessage} twice with 30
+ * messages in for a total of 60 delivery attempts that we only see 30
+ * messages end up at the final consumer. This is due to an exception being
+ * thrown during the processing of the first 30 messages which causes a
+ * redelivery.
+ *
+ * @throws Exception
+ */
@Test
public void testEndpointConfiguredBatchTransaction() throws Exception {
- // We should see the World message twice, once for the exception
+ // We should see the BatchMessage once in the prebatch and once in the
+ // redelivery. Then we should see 30 messages arrive in the postbatch.
getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
+ getMockEndpoint("mock:test.redelivery").expectedMessageCount(1);
getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
@@ -48,8 +65,7 @@ public class BatchTransactedQueueProduce
}
template.sendBody("direct:start", messages);
- getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
- getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+ assertMockEndpointsSatisfied();
}
@@ -68,9 +84,37 @@ public class BatchTransactedQueueProduce
return new RouteBuilder() {
@Override
public void configure() {
+ onException(Exception.class)
+ .handled(true)
+ .setHeader("redeliveryAttempt")
+ .constant("1")
+ .log("Redelivery attempt 1")
+ .to("mock:test.redelivery")
+ .to("direct:start");
+
from("direct:start")
.to("log:test-before?showAll=true")
.to("sjms:queue:batch.queue?transacted=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // This will force an exception to occur on the exchange
+ // which will invoke our onException handler to
+ // redeliver our batch message
+
+ // Get the redelivery header
+ String redeliveryAttempt = exchange.getIn().getHeader("redeliveryAttempt", String.class);
+
+ // Verify that it isn't empty
+ // if it is do nothing and force the Exception
+ if(redeliveryAttempt != null && redeliveryAttempt.equals("1")) {
+ // do nothing and allow it to proceed
+ } else {
+ log.info("BatchMessage received without redelivery. Rolling back.");
+ exchange.setException(new Exception());
+ }
+ }
+ })
.to("mock:test.prebatch");
from("sjms:queue:batch.queue")
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java Mon Sep 24 22:05:40 2012
@@ -26,28 +26,38 @@ import org.apache.camel.test.junit4.Came
import org.junit.Test;
/**
- * Verify the ability to batch transactions.
+ * Verify the ability to batch transactions to the consumer.
*
*/
public class BatchTransactedTopicConsumerTest extends CamelTestSupport {
-
+
+
/**
- * Verify that messages are being redelivered
+ * Verify that after only sending 10 messages that 10 are delivered to the
+ * processor and upon the 10th message throwing an Exception which causes
+ * the messages deliveries to be rolled back. The messages should then be
+ * redelivered with the JMSRedelivered flag set to true for a total of 20
+ * delivered messages, 10 to each topic.
+ *
* @throws Exception
*/
@Test
public void testEndpointConfiguredBatchTransaction() throws Exception {
// We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback.
- getMockEndpoint("mock:test.before").expectedMessageCount(10);
- getMockEndpoint("mock:test.after").expectedMessageCount(10);
+ getMockEndpoint("mock:test.before.1").expectedMessageCount(10);
+ getMockEndpoint("mock:test.before.2").expectedMessageCount(10);
+ getMockEndpoint("mock:test.after.1").expectedMinimumMessageCount(10);
+ getMockEndpoint("mock:test.after.2").expectedMessageCount(10);
// Send only 10 messages
for (int i = 1; i <= 10; i++) {
template.sendBody("direct:start", "Hello World " + i);
}
- getMockEndpoint("mock:test.before").assertIsSatisfied();
- getMockEndpoint("mock:test.after").assertIsSatisfied();
+ getMockEndpoint("mock:test.before.1").assertIsSatisfied();
+ getMockEndpoint("mock:test.before.2").assertIsSatisfied();
+ getMockEndpoint("mock:test.after.1").assertIsSatisfied();
+ getMockEndpoint("mock:test.after.2").assertIsSatisfied();
}
@@ -58,6 +68,7 @@ public class BatchTransactedTopicConsume
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(connectionFactory);
+ component.setMaxConnections(1);
camelContext.addComponent("sjms", component);
return camelContext;
@@ -78,8 +89,8 @@ public class BatchTransactedTopicConsume
// first consume all the messages that are not redelivered
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
- .to("log:before_log?showAll=true")
- .to("mock:test.before")
+ .log("Consumer 1 Message Before Received: ${body}")
+ .to("mock:test.before.1")
// This is where we will cause the rollback after 10 messages have been sent.
.process(new Processor() {
@Override
@@ -87,17 +98,47 @@ public class BatchTransactedTopicConsume
// Get the body
String body = exchange.getIn().getBody(String.class);
- // If the message ends with 10, throw the exception
- if (body.endsWith("10")) {
- log.info("10th message received. Rolling back.");
+ // Try failing in two places to
+ // ensure we still get the number of messages that
+ // we expect across the topics
+ if (body.endsWith("6")) {
+ log.info("5th message received. Rolling back.");
+ exchange.getOut().setFault(true);
+ exchange.getOut().setBody("5th message received. Rolling back.");
+ }
+ }
+ })
+ .otherwise()
+ .log("Consumer 1 Message After Received: ${body}")
+ .to("mock:test.after.1");
+
+ // Our test consumer route
+ from("sjms:topic:transacted.consumer.test?transacted=true&transactionBatchCount=10")
+ // first consume all the messages that are not redelivered
+ .choice()
+ .when(header("JMSRedelivered").isEqualTo("false"))
+ .log("Consumer 2 Message Before Received: ${body}")
+ .to("mock:test.before.2")
+ // This is where we will cause the rollback after 10 messages have been sent.
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Get the body
+ String body = exchange.getIn().getBody(String.class);
+
+ // Try failing in two places to
+ // ensure we still get the number of messages that
+ // we expect across the topics
+ if (body.endsWith("3") || body.endsWith("7")) {
+ log.info("5th message received. Rolling back.");
exchange.getOut().setFault(true);
- exchange.getOut().setBody("10th message received. Rolling back.");
+ exchange.getOut().setBody("5th message received. Rolling back.");
}
}
})
.otherwise()
- .to("log:after_log?showAll=true")
- .to("mock:test.after");
+ .log("Consumer 2 Message After Received: ${body}")
+ .to("mock:test.after.2");
}
};
}
Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java?rev=1389620&r1=1389619&r2=1389620&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java Mon Sep 24 22:05:40 2012
@@ -21,6 +21,8 @@ import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
@@ -29,16 +31,32 @@ import org.apache.camel.component.sjms.S
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
+/**
+ * Test used to verify the batch transaction capability of the SJMS Component
+ * for a Topic Producer.
+ */
public class BatchTransactedTopicProducerTest extends CamelTestSupport {
@Produce
protected ProducerTemplate template;
+ /**
+ * Verify that after processing a {@link BatchMessage} twice with 30
+ * messages in for a total of 120 delivery attempts that we only see 60
+ * messages end up at the final consumers. This is due to an exception being
+ * thrown during the processing of the first 30 messages which causes a
+ * redelivery.
+ *
+ * @throws Exception
+ */
@Test
public void testEndpointConfiguredBatchTransaction() throws Exception {
- // We should see the World message twice, once for the exception
+ // We should see the BatchMessage once in the prebatch and once in the
+ // redelivery. Then we should see 30 messages arrive in the postbatch.
getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
- getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
+ getMockEndpoint("mock:test.redelivery").expectedMessageCount(1);
+ getMockEndpoint("mock:test.postbatch.1").expectedMessageCount(30);
+ getMockEndpoint("mock:test.postbatch.2").expectedMessageCount(30);
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= 30; i++) {
@@ -48,8 +66,7 @@ public class BatchTransactedTopicProduce
}
template.sendBody("direct:start", messages);
- getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
- getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+ assertMockEndpointsSatisfied();
}
@@ -68,14 +85,47 @@ public class BatchTransactedTopicProduce
return new RouteBuilder() {
@Override
public void configure() {
+ // Used to force the redelivery of the messages to the producer
+ onException(Exception.class)
+ .handled(true)
+ .setHeader("redeliveryAttempt")
+ .constant("1")
+ .log("Redelivery attempt 1")
+ .to("mock:test.redelivery")
+ .to("direct:start");
+
from("direct:start")
.to("log:test-before?showAll=true")
.to("sjms:topic:batch.topic?transacted=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // This will force an exception to occur on the exchange
+ // which will invoke our onException handler to
+ // redeliver our batch message
+
+ // Get the redelivery header
+ String redeliveryAttempt = exchange.getIn().getHeader("redeliveryAttempt", String.class);
+
+ // Verify that it isn't empty
+ // if it is do nothing and force the Exception
+ if(redeliveryAttempt != null && redeliveryAttempt.equals("1")) {
+ // do nothing and allow it to proceed
+ } else {
+ log.info("BatchMessage received without redelivery. Rolling back.");
+ exchange.setException(new Exception());
+ }
+ }
+ })
.to("mock:test.prebatch");
from("sjms:topic:batch.topic")
.to("log:test-after?showAll=true")
- .to("mock:test.postbatch");
+ .to("mock:test.postbatch.1");
+
+ from("sjms:topic:batch.topic")
+ .to("log:test-after?showAll=true")
+ .to("mock:test.postbatch.2");
}
};
}