You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/30 05:42:14 UTC
svn commit: r1378796 [3/3] - in /camel/trunk/components/camel-sjms: ./
src/main/java/org/apache/camel/component/sjms/
src/main/java/org/apache/camel/component/sjms/consumer/
src/main/java/org/apache/camel/component/sjms/jms/
src/main/java/org/apache/ca...
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Verify the ability to batch transactions.
+ *
+ */
+public class BatchTransactedQueueConsumerTest extends CamelTestSupport {
+
+ /**
+ * Verify that messages are being redelivered
+ * @throws Exception
+ */
+ @Test
+ public void testEndpointConfiguredBatchTransaction() throws Exception {
+ // We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback.
+ getMockEndpoint("mock:test.before").expectedMessageCount(10);
+ getMockEndpoint("mock:test.after").expectedMessageCount(10);
+
+ // Send only 10 messages
+ for (int i = 1; i <= 10; i++) {
+ template.sendBody("direct:start", "Hello World " + i);
+ }
+
+ getMockEndpoint("mock:test.before").assertIsSatisfied();
+ getMockEndpoint("mock:test.after").assertIsSatisfied();
+
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ // Having a producer route helps with debugging and logging
+ from("direct:start")
+ .to("sjms:queue:transacted.consumer.test");
+
+ // Our test consumer route
+ from("sjms:queue:transacted.consumer.test?transacted=true&transactionBatchCount=10")
+ // first consume all the messages that are not redelivered
+ .choice()
+ .when(header("JMSRedelivered").isEqualTo("false"))
+ .to("log:before_log?showAll=true")
+ .to("mock:test.before")
+ // This is where we will cause the rollback after 10 messages have been sent.
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Get the body
+ String body = exchange.getIn().getBody(String.class);
+
+ // If the message ends with 10, throw the exception
+ if (body.endsWith("10")) {
+ log.info("10th message received. Rolling back.");
+ exchange.getOut().setFault(true);
+ exchange.getOut().setBody("10th message received. Rolling back.");
+ }
+ }
+ })
+ .otherwise()
+ .to("log:after_log?showAll=true")
+ .to("mock:test.after");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.BatchMessage;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class BatchTransactedQueueProducerTest extends CamelTestSupport {
+
+ @Produce
+ protected ProducerTemplate template;
+
+ @Test
+ public void testEndpointConfiguredBatchTransaction() throws Exception {
+ // We should see the World message twice, once for the exception
+ getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
+ getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
+
+ List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
+ for (int i = 1; i <= 30; i++) {
+ String body = "Hello World " + i;
+ BatchMessage<String> message = new BatchMessage<String>(body, null);
+ messages.add(message);
+ }
+ template.sendBody("direct:start", messages);
+
+ getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
+ getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ SjmsComponent sjms = new SjmsComponent();
+ sjms.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", sjms);
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("log:test-before?showAll=true")
+ .to("sjms:queue:batch.queue?transacted=true")
+ .to("mock:test.prebatch");
+
+ from("sjms:queue:batch.queue")
+ .to("log:test-after?showAll=true")
+ .to("mock:test.postbatch");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Verify the ability to batch transactions.
+ *
+ */
+public class BatchTransactedTopicConsumerTest extends CamelTestSupport {
+
+ /**
+ * Verify that messages are being redelivered
+ * @throws Exception
+ */
+ @Test
+ public void testEndpointConfiguredBatchTransaction() throws Exception {
+ // We should get two sets of 10 messages. 10 before the rollback and 10 after the rollback.
+ getMockEndpoint("mock:test.before").expectedMessageCount(10);
+ getMockEndpoint("mock:test.after").expectedMessageCount(10);
+
+ // Send only 10 messages
+ for (int i = 1; i <= 10; i++) {
+ template.sendBody("direct:start", "Hello World " + i);
+ }
+
+ getMockEndpoint("mock:test.before").assertIsSatisfied();
+ getMockEndpoint("mock:test.after").assertIsSatisfied();
+
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ // Having a producer route helps with debugging and logging
+ from("direct:start")
+ .to("sjms:topic:transacted.consumer.test");
+
+ // Our test consumer route
+ from("sjms:topic:transacted.consumer.test?transacted=true&transactionBatchCount=10")
+ // first consume all the messages that are not redelivered
+ .choice()
+ .when(header("JMSRedelivered").isEqualTo("false"))
+ .to("log:before_log?showAll=true")
+ .to("mock:test.before")
+ // This is where we will cause the rollback after 10 messages have been sent.
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Get the body
+ String body = exchange.getIn().getBody(String.class);
+
+ // If the message ends with 10, throw the exception
+ if (body.endsWith("10")) {
+ log.info("10th message received. Rolling back.");
+ exchange.getOut().setFault(true);
+ exchange.getOut().setBody("10th message received. Rolling back.");
+ }
+ }
+ })
+ .otherwise()
+ .to("log:after_log?showAll=true")
+ .to("mock:test.after");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.BatchMessage;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class BatchTransactedTopicProducerTest extends CamelTestSupport {
+
+ @Produce
+ protected ProducerTemplate template;
+
+ @Test
+ public void testEndpointConfiguredBatchTransaction() throws Exception {
+ // We should see the World message twice, once for the exception
+ getMockEndpoint("mock:test.prebatch").expectedMessageCount(1);
+ getMockEndpoint("mock:test.postbatch").expectedMessageCount(30);
+
+ List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
+ for (int i = 1; i <= 30; i++) {
+ String body = "Hello World " + i;
+ BatchMessage<String> message = new BatchMessage<String>(body, null);
+ messages.add(message);
+ }
+ template.sendBody("direct:start", messages);
+
+ getMockEndpoint("mock:test.prebatch").assertIsSatisfied();
+ getMockEndpoint("mock:test.postbatch").assertIsSatisfied();
+
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ SjmsComponent sjms = new SjmsComponent();
+ sjms.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", sjms);
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("log:test-before?showAll=true")
+ .to("sjms:topic:batch.topic?transacted=true")
+ .to("mock:test.prebatch");
+
+ from("sjms:topic:batch.topic")
+ .to("log:test-after?showAll=true")
+ .to("mock:test.postbatch");
+ }
+ };
+ }
+}
Copied: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java (from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java?p2=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java&p1=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.sjms.consumer;
+package org.apache.camel.component.sjms.tx;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
@@ -26,7 +25,6 @@ import org.apache.camel.builder.RouteBui
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.component.sjms.jms.JmsMessageHeaderType;
import org.apache.camel.test.junit4.CamelTestSupport;
-
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,8 +63,7 @@ public class TransactedInOnlyQueueConsum
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "vm://broker?broker.persistent=false&broker.useJmx=true");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(connectionFactory);
camelContext.addComponent("sjms", component);
@@ -86,7 +83,7 @@ public class TransactedInOnlyQueueConsum
logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
logger.info("Exchange does not have a retry message. Set the exception and allow the retry.");
- exchange.setException(new CamelException("Creating Failure"));
+ exchange.getOut().setFault(true);
} else {
logger.info("Exchange has retry header. Continue processing the message.");
}
Copied: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java (from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java?p2=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java&p1=camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.sjms.consumer;
+package org.apache.camel.component.sjms.tx;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
@@ -84,7 +84,7 @@ public class TransactedInOnlyTopicConsum
logger.info("Begin processing Exchange ID: {}", exchange.getExchangeId());
if (!exchange.getIn().getHeader(JmsMessageHeaderType.JMSRedelivered.toString(), String.class).equalsIgnoreCase("true")) {
logger.info("Exchange does not have a retry message. Set the exception and allow the retry.");
- exchange.setException(new RuntimeCamelException("Creating Failure"));
+ exchange.getOut().setFault(true);
} else {
logger.info("Exchange has retry header. Continue processing the message.");
}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class TransactedQueueProducerTest extends CamelTestSupport {
+
+ @Produce
+ protected ProducerTemplate template;
+
+ public TransactedQueueProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testTransactedProducer() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World 2");
+
+ template.sendBodyAndHeader("direct:start", "Hello World 1", "isfailed", true);
+ template.sendBodyAndHeader("direct:start", "Hello World 2", "isfailed", false);
+
+ mock.assertIsSatisfied();
+ }
+
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ CamelContext camelContext = super.createCamelContext();
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+ return camelContext;
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .to("sjms:queue:test.queue?transacted=true")
+ .process(
+ new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (exchange.getIn().getHeader("isfailed", Boolean.class)) {
+ log.info("We failed. Should roll back.");
+ exchange.getOut().setFault(true);
+ } else {
+ log.info("We passed. Should commit.");
+ }
+ }
+ });
+
+ from("sjms:queue:test.queue?durableSubscriptionId=bar&transacted=true")
+ .to("mock:result");
+
+
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class TransactedTopicProducerTest extends CamelTestSupport {
+
+ @Produce
+ protected ProducerTemplate template;
+ protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+
+ public TransactedTopicProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testTransactedProducer() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World 2");
+
+ template.sendBodyAndHeader("direct:start", "Hello World 1", "isfailed", true);
+ template.sendBodyAndHeader("direct:start", "Hello World 2", "isfailed", false);
+
+ mock.assertIsSatisfied();
+ }
+
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
+ ConnectionFactoryResource connectionResource = new ConnectionFactoryResource();
+ connectionResource.setConnectionFactory(connectionFactory);
+ connectionResource.setClientId("test-connection-1");
+ CamelContext camelContext = super.createCamelContext();
+ SjmsComponent component = new SjmsComponent();
+ component.setConnectionResource(connectionResource);
+ component.setMaxConnections(1);
+ camelContext.addComponent("sjms", component);
+ return camelContext;
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .to("sjms:topic:test.topic?transacted=true")
+ .process(
+ new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (exchange.getIn().getHeader("isfailed", Boolean.class)) {
+ log.info("We failed. Should roll back.");
+ exchange.getOut().setFault(true);
+ } else {
+ log.info("We passed. Should commit.");
+ }
+ }
+ });
+
+ from("sjms:topic:test.topic?durableSubscriptionId=bar&transacted=true")
+ .to("mock:result");
+
+ }
+ };
+ }
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Thu Aug 30 03:42:12 2012
@@ -17,13 +17,14 @@
#
# The logging properties used
#
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, out
# uncomment the following line to turn on Camel debugging
log4j.logger.org.apache.activemq=info
+log4j.logger.org.apache.activemq.transaction=trace
log4j.logger.org.apache.camel=info
log4j.logger.org.apache.camel.converter=info
-log4j.logger.org.apache.camel.component.sjms=debug
+log4j.logger.org.apache.camel.component.sjms=info
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender