You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cs...@apache.org on 2010/10/05 09:28:06 UTC
svn commit: r1004549 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java
Author: cschneider
Date: Tue Oct 5 07:28:05 2010
New Revision: 1004549
URL: http://svn.apache.org/viewvc?rev=1004549&view=rev
Log:
CAMEL-3193 Fix for reconnect problem with temp queues
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1004549&r1=1004548&r2=1004549&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Tue Oct 5 07:28:05 2010
@@ -27,7 +27,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.SimpleMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
/**
@@ -82,7 +82,8 @@ public class TemporaryQueueReplyManager
@Override
protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
- SimpleMessageListenerContainer answer = new SimpleMessageListenerContainer();
+ // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
+ DefaultMessageListenerContainer answer = new DefaultMessageListenerContainer();
answer.setDestinationName("temporary");
answer.setDestinationResolver(new DestinationResolver() {
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java?rev=1004549&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsReconnectTest.java Tue Oct 5 07:28:05 2010
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.temp;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.Produce;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.JmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spring.CamelBeanPostProcessor;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test for issue CAMEL-3193. Camel should support reconnects in case of
+ * connection loss to jms server. After reconnect Temporary destinations have to
+ * be recreated as they may become invalid
+ */
+public class JmsReconnectTest {
+ public interface MyService {
+ String echo(String st);
+ }
+
+ private final class EchoServiceImpl implements MyService {
+ public String echo(String st) {
+ return st;
+ }
+ }
+
+ @Produce(uri = "direct:test")
+ MyService proxy;
+
+ /**
+ * This test is disabled as the problem can currently not be reproduced using ActiveMQ.
+ * TODO Find a way to recreate the problem with ActiveMQ and fully automate the test
+ * @throws Exception
+ */
+ @Ignore
+ @Test
+ public void testRequestReply() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.addConnector("tcp://localhost:61616");
+ broker.setPersistent(false);
+ broker.setTimeBeforePurgeTempDestinations(1000);
+ broker.start();
+
+ DefaultCamelContext context = new DefaultCamelContext();
+ JmsComponent jmsComponent = new JmsComponent();
+
+ /**
+ *
+ */
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ connectionFactory.setBrokerURL("failover://(tcp://localhost:61616)?maxReconnectAttempts=1");
+
+ /**
+ * When using Tibco EMS the problem can be recreated. As the broker is
+ * external it has to be stopped and started by hand.
+ */
+ // TibjmsConnectionFactory connectionFactory = new
+ // TibjmsConnectionFactory();
+ // connectionFactory.setReconnAttemptCount(1);
+
+ jmsComponent.setConnectionFactory(connectionFactory);
+ jmsComponent.setRequestTimeout(1000);
+ jmsComponent.setReceiveTimeout(1000);
+ context.addComponent("jms", jmsComponent);
+ context.addRoutes(new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("jms:testqueue").bean(new EchoServiceImpl());
+ from("direct:test").to("jms:testqueue");
+ }
+ });
+ CamelBeanPostProcessor processor = new CamelBeanPostProcessor();
+ processor.setCamelContext(context);
+ processor.postProcessBeforeInitialization(this, "this");
+ context.start();
+
+ String ret = proxy.echo("test");
+ Assert.assertEquals("test", ret);
+
+ broker.stop();
+ /**
+ * Wait long enough for the jms client to do a full reconnect. In the
+ * Tibco EMS case this means that a Temporary Destination created before
+ * is invalid now
+ */
+ Thread.sleep(5000);
+
+ System.in.read();
+ broker.start(true);
+
+ /**
+ * Before the fix to this issue this call will throw a spring UncategorizedJmsException
+ * which contains an InvalidJmsDestination
+ */
+ String ret2 = proxy.echo("test");
+ Assert.assertEquals("test", ret2);
+
+ }
+}