You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/02 12:56:44 UTC
svn commit: r1368413 [5/5] - in /camel/trunk: components/
components/camel-sjms/ components/camel-sjms/src/
components/camel-sjms/src/main/ components/camel-sjms/src/main/java/
components/camel-sjms/src/main/java/org/
components/camel-sjms/src/main/jav...
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+public class TransactedQueueProducerTest extends CamelTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "transacted.test.queue";
+
+ @Produce
+ protected ProducerTemplate template;
+ protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.useJmx=false");
+ private Connection connection;
+ private Session session;
+
+ public TransactedQueueProducerTest() {
+ }
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.stop();
+ }
+ super.tearDown();
+ }
+
+ @Test
+ public void testTransactedQueueProducer() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start.transacted", expectedBody);
+ mc.receive(5000);
+ session.rollback();
+ Message message = mc.receive(5000);
+ session.commit();
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ @Test
+ public void testTransactedQueueProducerAsynchronousOverride() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start.transacted.async.override", expectedBody);
+ mc.receive(5000);
+ session.rollback();
+ Message message = mc.receive(5000);
+ session.commit();
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ @Test
+ public void testTransactedQueueProducerFailed() throws Exception {
+ MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Transaction Failed";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("direct:start.transacted", expectedBody);
+ mc.receive(5000);
+ session.rollback();
+ Enumeration<?> enumeration = session.createBrowser(session.createQueue(TEST_DESTINATION_NAME)).getEnumeration();
+ while (enumeration.hasMoreElements()) {
+ TextMessage tm = (TextMessage) enumeration.nextElement();
+ String text = tm.getText();
+ log.info("Element from Enumeration: {}", text);
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+ }
+
+ mock.assertIsSatisfied();
+ mc.close();
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ SjmsComponent component = new SjmsComponent();
+ component.setMaxConnections(1);
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+ return camelContext;
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("direct:start.transacted")
+ .to("sjms:queue:" + TEST_DESTINATION_NAME + "?transacted=true");
+
+ from("direct:start.transacted.async.override")
+ .to("sjms:queue:" + TEST_DESTINATION_NAME + "?transacted=true&synchronous=false");
+
+ from("direct:finish")
+ .to("log:test.log.1?showBody=true", "mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+/**
+ * TODO Add Class documentation for JmsTestSupport
+ *
+ */
+public class JmsTestSupport extends CamelTestSupport {
+ private static final String BROKER_URI = "tcp://localhost:33333";
+ @Produce
+ protected ProducerTemplate template;
+ private BrokerService broker;
+ private Connection connection;
+ private Session session;
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ super.doPreSetup();
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.setPersistent(false);
+ broker.deleteAllMessages();
+ broker.addConnector(BROKER_URI);
+ broker.start();
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ DefaultCamelContext dcc = (DefaultCamelContext)context;
+ while (!dcc.isStopped()) {
+ log.info("Waiting on the Camel Context to stop");
+ }
+ log.info("Closing JMS Session");
+ if (getSession() != null) {
+ getSession().close();
+ setSession(null);
+ }
+ log.info("Closing JMS Connection");
+ if (connection != null) {
+ connection.stop();
+ connection = null;
+ }
+ log.info("Stopping the ActiveMQ Broker");
+ if (broker != null) {
+ broker.stop();
+ broker = null;
+ }
+ }
+
+ /*
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ SjmsComponent component = new SjmsComponent();
+ component.setMaxConnections(1);
+ component.setConnectionFactory(connectionFactory);
+ camelContext.addComponent("sjms", component);
+ return camelContext;
+ }
+
+ public void setSession(Session session) {
+ this.session = session;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ *
+ */
+public class MyAsyncComponent extends DefaultComponent {
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ MyAsyncEndpoint answer = new MyAsyncEndpoint(uri, this);
+ answer.setReply(prepareReply(remaining));
+ setProperties(answer, parameters);
+ return answer;
+ }
+
+ private String prepareReply(String value) {
+ // to make URIs valid we make the conventions of using ':' for ' ' and
+ // capitalize words
+ String[] words = value.split(":");
+ String result = "";
+ for (String word : words) {
+ result += result.isEmpty() ? "" : " ";
+ result += word.substring(0, 1).toUpperCase(Locale.ENGLISH) + word.substring(1);
+ }
+ return result;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
+
+/**
+ *
+ */
+public class MyAsyncEndpoint extends DefaultEndpoint {
+
+ private String reply;
+ private long delay = 25;
+ private int failFirstAttempts;
+
+ public MyAsyncEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ public Producer createProducer() throws Exception {
+ Producer answer = new MyAsyncProducer(this);
+ if (isSynchronous()) {
+ // force it to be synchronously
+ return new SynchronousDelegateProducer(answer);
+ } else {
+ return answer;
+ }
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ throw new UnsupportedOperationException("Consumer not supported");
+ }
+
+ public boolean isSingleton() {
+ return false;
+ }
+
+ public String getReply() {
+ return reply;
+ }
+
+ public void setReply(String reply) {
+ this.reply = reply;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public int getFailFirstAttempts() {
+ return failFirstAttempts;
+ }
+
+ public void setFailFirstAttempts(int failFirstAttempts) {
+ this.failFirstAttempts = failFirstAttempts;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class MyAsyncProducer extends DefaultAsyncProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MyAsyncProducer.class);
+ private final ExecutorService executor;
+ private final AtomicInteger counter = new AtomicInteger();
+
+ public MyAsyncProducer(MyAsyncEndpoint endpoint) {
+ super(endpoint);
+ this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MyProducer");
+ }
+
+ public MyAsyncEndpoint getEndpoint() {
+ return (MyAsyncEndpoint) super.getEndpoint();
+ }
+
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ executor.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+
+ LOG.info("Simulating a task which takes " + getEndpoint().getDelay() + " millis to reply");
+ Thread.sleep(getEndpoint().getDelay());
+
+ int count = counter.incrementAndGet();
+ if (getEndpoint().getFailFirstAttempts() >= count) {
+ LOG.info("Simulating a failure at attempt " + count);
+ exchange.setException(new CamelExchangeException("Simulated error at attempt " + count, exchange));
+ } else {
+ String reply = getEndpoint().getReply();
+ exchange.getOut().setBody(reply);
+ // propagate headers
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ LOG.info("Setting reply " + reply);
+ }
+
+ LOG.info("Callback done(false)");
+ callback.done(false);
+ return null;
+ }
+ });
+
+ // indicate from this point forward its being routed asynchronously
+ LOG.info("Task submitted, now tell Camel routing engine to that this Exchange is being continued asynchronously");
+ return false;
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ *
+ * @author sully6768
+ */
+public class MyInOutTestConsumer implements MessageListener {
+ private static int ackMode;
+ private static String clientQueueName;
+
+ private boolean transacted;
+ private MessageProducer producer;
+
+ static {
+ clientQueueName = "client.messages";
+ ackMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ public MyInOutTestConsumer(ConnectionFactory connectionFactory) {
+ Connection connection;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(transacted, ackMode);
+ Destination adminQueue = session.createQueue(clientQueueName);
+
+ //Setup a message producer to send message to the queue the server is consuming from
+ this.producer = session.createProducer(adminQueue);
+ this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ //Create a temporary queue that this client will listen for responses on then create a consumer
+ //that consumes message from this temporary queue...for a real application a client should reuse
+ //the same temp queue for each message to the server...one temp queue per client
+ Destination tempDest = session.createTemporaryQueue();
+ MessageConsumer responseConsumer = session.createConsumer(tempDest);
+
+ //This class will handle the messages to the temp queue as well
+ responseConsumer.setMessageListener(this);
+
+ //Now create the actual message you want to send
+ TextMessage txtMessage = session.createTextMessage();
+ txtMessage.setText("MyProtocolMessage");
+
+ //Set the reply to field to the temp queue you created above, this is the queue the server
+ //will respond to
+ txtMessage.setJMSReplyTo(tempDest);
+
+ //Set a correlation ID so when you get a response you know which sent message the response is for
+ //If there is never more than one outstanding message to the server then the
+ //same correlation ID can be used for all the messages...if there is more than one outstanding
+ //message to the server you would presumably want to associate the correlation ID with this
+ //message somehow...a Map works good
+ String correlationId = this.createRandomString();
+ txtMessage.setJMSCorrelationID(correlationId);
+ this.producer.send(txtMessage);
+ } catch (JMSException e) {
+ //Handle the exception appropriately
+ }
+ }
+
+ private String createRandomString() {
+ Random random = new Random(System.currentTimeMillis());
+ long randomLong = random.nextLong();
+ return Long.toHexString(randomLong);
+ }
+
+ public void onMessage(Message message) {
+ String messageText = null;
+ try {
+ if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+ messageText = textMessage.getText();
+ System.out.println("messageText = " + messageText);
+ }
+ } catch (JMSException e) {
+ //Handle the exception appropriately
+ }
+ }
+
+}
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java (added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java Thu Aug 2 10:56:40 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.util.ObjectHelper;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for SjmsConnectionTestSupport
+ *
+ */
+public abstract class SjmsConnectionTestSupport {
+
+ static {
+ System.setProperty("org.apache.activemq.default.directory.prefix", "target/activemq/");
+ }
+
+ public static final String VM_BROKER_CONNECT_STRING = "vm://broker";
+ public static final String TCP_BROKER_CONNECT_STRING = "tcp://localhost:61616";
+ protected Logger logger = LoggerFactory.getLogger(getClass());
+ private ActiveMQConnectionFactory vmTestConnectionFactory;
+ private ActiveMQConnectionFactory testConnectionFactory;
+ private BrokerService brokerService;
+ private boolean persistenceEnabled;
+
+ public abstract String getConnectionUri();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setup() throws Exception {
+ if (ObjectHelper.isEmpty(getConnectionUri())
+ || getConnectionUri().startsWith("vm")) {
+ vmTestConnectionFactory = new ActiveMQConnectionFactory(
+ VM_BROKER_CONNECT_STRING);
+ } else {
+ createBroker();
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+
+ if (vmTestConnectionFactory != null) {
+ vmTestConnectionFactory = null;
+ }
+ if (testConnectionFactory != null) {
+ testConnectionFactory = null;
+ }
+ if (brokerService != null) {
+ destroyBroker();
+ }
+ }
+
+ /**
+ * Gets the ActiveMQConnectionFactory value of testConnectionFactory for
+ * this instance of SjmsConnectionTestSupport.
+ *
+ * @return the testConnectionFactory
+ */
+ public ActiveMQConnectionFactory createTestConnectionFactory(String uri) {
+ ActiveMQConnectionFactory cf = null;
+ if (ObjectHelper.isEmpty(uri)) {
+ cf = new ActiveMQConnectionFactory(VM_BROKER_CONNECT_STRING);
+ } else {
+ cf = new ActiveMQConnectionFactory(uri);
+ }
+ return cf;
+ }
+
+ protected void createBroker() throws Exception {
+ String connectString = getConnectionUri();
+ if (ObjectHelper.isEmpty(connectString)) {
+ connectString = TCP_BROKER_CONNECT_STRING;
+ }
+ brokerService = new BrokerService();
+ brokerService.setPersistent(isPersistenceEnabled());
+ brokerService.addConnector(connectString);
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ }
+
+ protected void destroyBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+ }
+
+ /**
+ * Sets the ActiveMQConnectionFactory value of testConnectionFactory for
+ * this instance of SjmsConnectionTestSupport.
+ *
+ * @param testConnectionFactory
+ * Sets ActiveMQConnectionFactory, default is TODO add default
+ */
+ public void setTestConnectionFactory(
+ ActiveMQConnectionFactory testConnectionFactory) {
+ this.testConnectionFactory = testConnectionFactory;
+ }
+
+ /**
+ * Gets the ActiveMQConnectionFactory value of testConnectionFactory for
+ * this instance of SjmsConnectionTestSupport.
+ *
+ * @return the testConnectionFactory
+ */
+ public ActiveMQConnectionFactory getTestConnectionFactory() {
+ return testConnectionFactory;
+ }
+
+ /**
+ * Sets the boolean value of persistenceEnabled for this instance of
+ * SjmsConnectionTestSupport.
+ *
+ * @param persistenceEnabled
+ * Sets boolean, default is false
+ */
+ public void setPersistenceEnabled(boolean persistenceEnabled) {
+ this.persistenceEnabled = persistenceEnabled;
+ }
+
+ /**
+ * Gets the boolean value of persistenceEnabled for this instance of
+ * SjmsConnectionTestSupport.
+ *
+ * @return the persistenceEnabled
+ */
+ public boolean isPersistenceEnabled() {
+ return persistenceEnabled;
+ }
+}
Added: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (added)
+++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Thu Aug 2 10:56:40 2012
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following line to turn on Camel debugging
+log4j.logger.org.apache.activemq=info
+log4j.logger.org.apache.camel=info
+log4j.logger.org.apache.camel.converter=info
+log4j.logger.org.apache.camel.component.sjms=debug
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer
+
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-sjms-test.log
+log4j.appender.file.append=true
\ No newline at end of file
Modified: camel/trunk/components/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/pom.xml?rev=1368413&r1=1368412&r2=1368413&view=diff
==============================================================================
--- camel/trunk/components/pom.xml (original)
+++ camel/trunk/components/pom.xml Thu Aug 2 10:56:40 2012
@@ -82,7 +82,7 @@
<module>camel-freemarker</module>
<module>camel-ftp</module>
<module>camel-gae</module>
- <module>camel-gson</module>
+ <module>camel-gson</module>
<module>camel-guava-eventbus</module>
<module>camel-guice</module>
<module>camel-hawtdb</module>
@@ -141,6 +141,7 @@
<module>camel-sip</module>
<module>camel-smpp</module>
<module>camel-snmp</module>
+ <module>camel-sjms</module>
<module>camel-soap</module>
<module>camel-solr</module>
<module>camel-spring-batch</module>
Modified: camel/trunk/platforms/karaf/features/src/main/resources/features.xml
URL: http://svn.apache.org/viewvc/camel/trunk/platforms/karaf/features/src/main/resources/features.xml?rev=1368413&r1=1368412&r2=1368413&view=diff
==============================================================================
--- camel/trunk/platforms/karaf/features/src/main/resources/features.xml (original)
+++ camel/trunk/platforms/karaf/features/src/main/resources/features.xml Thu Aug 2 10:56:40 2012
@@ -731,6 +731,14 @@
<bundle dependency='true'>mvn:org.apache.shiro/shiro-core/${shiro-version}</bundle>
<bundle>mvn:org.apache.camel/camel-shiro/${project.version}</bundle>
</feature>
+ <feature name='camel-sjms' version='${project.version}' resolver='(obr)' start-level='50'>
+ <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-annotation_1.0_spec/${gernimo-annotation-spec-version}</bundle>
+ <!-- JTA is not currently supported by SJMS but is a required dependency of the Geronimo JMS Bundle -->
+ <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jta_1.1_spec/${geronimo-jta-spec-version}</bundle>
+ <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo-jms-spec-version}</bundle>
+ <feature version='${project.version}'>camel-core</feature>
+ <bundle>mvn:org.apache.camel/camel-sjms/${project.version}</bundle>
+ </feature>
<feature name='camel-smpp' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jsmpp/${jsmpp-bundle-version}</bundle>