You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/20 04:48:58 UTC
[camel] branch master updated: CAMEL-6950 camel-sjms: Lacks
reconnection logic in case of exception (#3581)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5b04199 CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception (#3581)
5b04199 is described below
commit 5b041991668c7442c9204776834489aac2fa61ec
Author: Jan Filipski <gl...@users.noreply.github.com>
AuthorDate: Thu Feb 20 05:48:45 2020 +0100
CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception (#3581)
* CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception
* CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception
Attempt with adjustable reconnect delay and connection separation for restart
---
.../sjms/AggregatedExceptionListener.java | 53 +++++
.../apache/camel/component/sjms/SjmsConsumer.java | 264 +++++++++++++--------
.../apache/camel/component/sjms/SjmsEndpoint.java | 25 ++
.../apache/camel/component/sjms/SjmsProducer.java | 22 +-
.../component/sjms/producer/InOutProducer.java | 17 +-
.../component/sjms/ReconnectConsumerTest.java | 55 +++++
.../component/sjms/ReconnectInOutProducerTest.java | 115 +++++++++
.../component/sjms/ReconnectProducerTest.java | 101 ++++++++
.../component/sjms/support/JmsTestSupport.java | 25 +-
9 files changed, 572 insertions(+), 105 deletions(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java
new file mode 100644
index 0000000..bed3bc5
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AggregatedExceptionListener implements ExceptionListener {
+ private static final Logger LOG = LoggerFactory.getLogger(AggregatedExceptionListener.class);
+ private final Set<ExceptionListener> listenerSet;
+
+ public AggregatedExceptionListener(ExceptionListener... exceptionListeners) {
+ this.listenerSet = new HashSet<>(exceptionListeners.length);
+ for (ExceptionListener exceptionListener : exceptionListeners) {
+ if (exceptionListener instanceof AggregatedExceptionListener) {
+ listenerSet.addAll(((AggregatedExceptionListener) exceptionListener).listenerSet); //prevent multiwrapping
+ } else {
+ listenerSet.add(exceptionListener);
+ }
+ }
+ }
+
+ @Override
+ public void onException(JMSException exception) {
+ for (ExceptionListener listener : listenerSet) {
+ try {
+ listener.onException(exception);
+ } catch (Exception ex) {
+ LOG.error("Exception listeners shouldn't throw exceptions", ex);
+ }
+ }
+ }
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 898377e..7bc2100 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -16,10 +16,20 @@
*/
package org.apache.camel.component.sjms;
-import java.util.concurrent.ExecutorService;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.WeakHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -38,8 +48,7 @@ import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
-import org.apache.commons.pool.BasePoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.camel.util.backoff.BackOffTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,52 +59,10 @@ public class SjmsConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(SjmsConsumer.class);
- protected GenericObjectPool<MessageConsumerResources> consumers;
- private ExecutorService executor;
+ private final Map<Connection, List<MessageConsumerResources>> consumers = new WeakHashMap<>();
+ private ScheduledExecutorService scheduler;
private Future<?> asyncStart;
-
- /**
- * A pool of MessageConsumerResources created at the initialization of the associated consumer.
- */
- protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> {
-
- /**
- * Creates a new MessageConsumerResources instance.
- *
- * @see org.apache.commons.pool.PoolableObjectFactory#makeObject()
- */
- @Override
- public MessageConsumerResources makeObject() throws Exception {
- return createConsumer();
- }
-
- /**
- * Cleans up the MessageConsumerResources.
- *
- * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
- */
- @Override
- public void destroyObject(MessageConsumerResources model) throws Exception {
- if (model != null) {
- // First clean up our message consumer
- if (model.getMessageConsumer() != null) {
- model.getMessageConsumer().close();
- }
-
- // If the resource has a
- if (model.getSession() != null) {
- if (model.getSession().getTransacted()) {
- try {
- model.getSession().rollback();
- } catch (Exception e) {
- // Do nothing. Just make sure we are cleaned up
- }
- }
- model.getSession().close();
- }
- }
- }
- }
+ private BackOffTimer.Task rescheduleTask;
public SjmsConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -110,36 +77,58 @@ public class SjmsConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
- this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
- if (consumers == null) {
- consumers = new GenericObjectPool<>(new MessageConsumerResourcesFactory());
- consumers.setMaxActive(getConsumerCount());
- consumers.setMaxIdle(getConsumerCount());
- if (getEndpoint().isAsyncStartListener()) {
- asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
- @Override
- public void run() {
- try {
- fillConsumersPool();
- } catch (Throwable e) {
- LOG.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ this.scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "SjmsConsumer");
+ if (getEndpoint().isAsyncStartListener()) {
+ asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fillConsumersPool();
+ } catch (Throwable e) {
+ LOG.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ if (getEndpoint().isReconnectOnError()) {
+ scheduleRefill(); //we should try to fill consumer pool on next time
}
}
+ }
- @Override
- public String toString() {
- return "AsyncStartListenerTask[" + getDestinationName() + "]";
- }
- });
- } else {
- fillConsumersPool();
- }
+ @Override
+ public String toString() {
+ return "AsyncStartListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ fillConsumersPool();
}
}
private void fillConsumersPool() throws Exception {
- while (consumers.getNumIdle() < consumers.getMaxIdle()) {
- consumers.addObject();
+ synchronized (consumers) {
+ while (consumers.values().stream().collect(Collectors.summarizingInt(List::size)).getSum() < getConsumerCount()) {
+ addConsumer();
+ }
+ }
+ }
+
+ public void destroyObject(MessageConsumerResources model) {
+ try {
+ if (model.getMessageConsumer() != null) {
+ model.getMessageConsumer().close();
+ }
+
+ // If the resource has a
+ if (model.getSession() != null) {
+ if (model.getSession().getTransacted()) {
+ try {
+ model.getSession().rollback();
+ } catch (Exception e) {
+ // Do nothing. Just make sure we are cleaned up
+ }
+ }
+ model.getSession().close();
+ }
+ } catch (JMSException ex) {
+ LOG.warn("Exception caught on closing consumer", ex);
}
}
@@ -149,31 +138,36 @@ public class SjmsConsumer extends DefaultConsumer {
if (asyncStart != null && !asyncStart.isDone()) {
asyncStart.cancel(true);
}
- if (consumers != null) {
- if (getEndpoint().isAsyncStopListener()) {
- getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
- @Override
- public void run() {
- try {
- consumers.close();
- consumers = null;
- } catch (Throwable e) {
- LOG.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ if (rescheduleTask != null) {
+ rescheduleTask.cancel();
+ }
+ if (getEndpoint().isAsyncStopListener()) {
+ getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ synchronized (consumers) {
+ consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+ consumers.clear();
}
+ } catch (Throwable e) {
+ LOG.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
}
+ }
- @Override
- public String toString() {
- return "AsyncStopListenerTask[" + getDestinationName() + "]";
- }
- });
- } else {
- consumers.close();
- consumers = null;
+ @Override
+ public String toString() {
+ return "AsyncStopListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ synchronized (consumers) {
+ consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+ consumers.clear();
}
}
- if (this.executor != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
+ if (this.scheduler != null) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.scheduler);
}
}
@@ -181,7 +175,7 @@ public class SjmsConsumer extends DefaultConsumer {
* Creates a {@link MessageConsumerResources} with a dedicated
* {@link Session} required for transacted and InOut consumers.
*/
- private MessageConsumerResources createConsumer() throws Exception {
+ private void addConsumer() throws Exception {
MessageConsumerResources answer;
ConnectionResource connectionResource = getOrCreateConnectionResource();
Connection conn = connectionResource.borrowConnection();
@@ -191,17 +185,32 @@ public class SjmsConsumer extends DefaultConsumer {
MessageListener handler = createMessageHandler(session);
messageConsumer.setMessageListener(handler);
+ if (getEndpoint().isReconnectOnError()) {
+ ExceptionListener exceptionListener = conn.getExceptionListener();
+ ReconnectExceptionListener reconnectExceptionListener = new ReconnectExceptionListener(conn);
+ if (exceptionListener == null) {
+ exceptionListener = reconnectExceptionListener;
+ } else {
+ exceptionListener = new AggregatedExceptionListener(exceptionListener, reconnectExceptionListener);
+ }
+ conn.setExceptionListener(exceptionListener);
+ }
answer = new MessageConsumerResources(session, messageConsumer);
+ consumers.compute(conn, (key, oldValue) -> {
+ if (oldValue == null) {
+ oldValue = new ArrayList<>();
+ }
+ oldValue.add(answer);
+ return oldValue;
+ });
} catch (Exception e) {
LOG.error("Unable to create the MessageConsumer", e);
throw e;
} finally {
connectionResource.returnConnection(conn);
}
- return answer;
}
-
/**
* Helper factory method used to create a MessageListener based on the MEP
*
@@ -230,15 +239,15 @@ public class SjmsConsumer extends DefaultConsumer {
AbstractMessageHandler messageHandler;
if (getEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
if (isTransacted() || isSynchronous()) {
- messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization);
+ messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler, synchronization);
} else {
- messageHandler = new InOnlyMessageHandler(getEndpoint(), executor);
+ messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler);
}
} else {
if (isTransacted() || isSynchronous()) {
- messageHandler = new InOutMessageHandler(getEndpoint(), executor, synchronization);
+ messageHandler = new InOutMessageHandler(getEndpoint(), scheduler, synchronization);
} else {
- messageHandler = new InOutMessageHandler(getEndpoint(), executor);
+ messageHandler = new InOutMessageHandler(getEndpoint(), scheduler);
}
}
@@ -288,6 +297,7 @@ public class SjmsConsumer extends DefaultConsumer {
public boolean isSharedJMSSession() {
return getEndpoint().isSharedJMSSession();
}
+
/**
* Use to determine whether or not to process exchanges synchronously.
*
@@ -369,4 +379,62 @@ public class SjmsConsumer extends DefaultConsumer {
return getEndpoint().getTransactionBatchTimeout();
}
+ private boolean refillPool(BackOffTimer.Task task) {
+ try {
+ fillConsumersPool();
+ return false;
+ } catch (Exception ex) {
+ LOG.error("Cannot refill consumers pool, attempt " + task.getCurrentAttempts(), ex);
+ }
+ return true;
+ }
+
+ private void scheduleRefill() {
+ if (rescheduleTask == null || rescheduleTask.getStatus() != BackOffTimer.Task.Status.Active) {
+ rescheduleTask = new BackOffTimer(scheduler).schedule(getEndpoint().getReconnectBackOff(), this::refillPool);
+ }
+ }
+
+ private final class ReconnectExceptionListener implements ExceptionListener {
+ private final WeakReference<Connection> connection;
+
+ private ReconnectExceptionListener(Connection connection) {
+ this.connection = new WeakReference<>(connection);
+ }
+
+ @Override
+ public void onException(JMSException exception) {
+ LOG.debug("Handling JMSException for reconnecting", exception);
+ Connection currentConnection = connection.get();
+ if (currentConnection != null) {
+ synchronized (consumers) {
+ List<MessageConsumerResources> toClose = consumers.get(currentConnection);
+ if (toClose != null) {
+ toClose.forEach(SjmsConsumer.this::destroyObject);
+ }
+ consumers.remove(currentConnection);
+ }
+ scheduleRefill();
+ }
+ }
+
+ //hash and equals to prevent multiple instances for same connection
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReconnectExceptionListener that = (ReconnectExceptionListener) o;
+ return Objects.equals(connection.get(), that.connection.get());
+ }
+
+ @Override
+ public int hashCode() {
+ final Connection currentConnection = this.connection.get();
+ return currentConnection == null ? 0 : currentConnection.hashCode();
+ }
+ }
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 818b608..d2926fc 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.sjms;
+import java.time.Duration;
+
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.Message;
@@ -54,6 +56,7 @@ import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.LoggingExceptionHandler;
+import org.apache.camel.util.backoff.BackOff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,6 +183,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
@UriParam(defaultValue = "true", label = "consumer,logging",
description = "Allows to control whether stacktraces should be logged or not, by the default errorHandler.")
private boolean errorHandlerLogStackTrace = true;
+ @UriParam(label = "consumer", description = "Try to apply reconnection logic on consumer pool")
+ private boolean reconnectOnError = true;
+ @UriParam(label = "consumer", description = "Backoff policy on consumer pool reconnection",
+ defaultValueNote = "Default backoff is infinite retries with 5 seconds delay")
+ private BackOff reconnectBackOff = BackOff.builder().delay(Duration.ofSeconds(5)).build();
private volatile boolean closeConnectionResource;
@@ -754,4 +762,21 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
public void setJmsObjectFactory(JmsObjectFactory jmsObjectFactory) {
this.jmsObjectFactory = jmsObjectFactory;
}
+
+
+ public boolean isReconnectOnError() {
+ return reconnectOnError;
+ }
+
+ public void setReconnectOnError(boolean reconnectOnError) {
+ this.reconnectOnError = reconnectOnError;
+ }
+
+ public BackOff getReconnectBackOff() {
+ return reconnectBackOff;
+ }
+
+ public void setReconnectBackOff(BackOff reconnectBackOff) {
+ this.reconnectBackOff = reconnectBackOff;
+ }
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index a3e75d9..0c53a4b 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -54,6 +55,17 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
}
@Override
+ public boolean validateObject(MessageProducerResources obj) {
+ try {
+ obj.getSession().getAcknowledgeMode();
+ return true;
+ } catch (JMSException ex) {
+ LOG.error("Cannot validate session", ex);
+ }
+ return false;
+ }
+
+ @Override
public void destroyObject(MessageProducerResources model) throws Exception {
if (model.getMessageProducer() != null) {
model.getMessageProducer().close();
@@ -90,10 +102,12 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer");
if (getProducers() == null) {
- setProducers(new GenericObjectPool<>(new MessageProducerResourcesFactory()));
- getProducers().setMaxActive(getProducerCount());
- getProducers().setMaxIdle(getProducerCount());
- getProducers().setLifo(false);
+ GenericObjectPool<MessageProducerResources> producers = new GenericObjectPool<>(new MessageProducerResourcesFactory());
+ setProducers(producers);
+ producers.setMaxActive(getProducerCount());
+ producers.setMaxIdle(getProducerCount());
+ producers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
+ producers.setLifo(false);
if (getEndpoint().isPrefillPool()) {
if (getEndpoint().isAsyncStartListener()) {
asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index d02cadb..401d920 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -92,7 +93,7 @@ public class InOutProducer extends SjmsProducer {
}
Destination replyToDestination;
- boolean isReplyToTopic = false;
+ boolean isReplyToTopic;
if (ObjectHelper.isEmpty(getNamedReplyTo())) {
isReplyToTopic = isTopic();
replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isReplyToTopic);
@@ -117,7 +118,7 @@ public class InOutProducer extends SjmsProducer {
// we cannot continue routing the unknown message
// log a warn and then ignore the message
LOG.warn("Reply received for unknown correlationID [{}] on reply destination [{}]. Current correlation map size: {}. The message will be ignored: {}",
- new Object[]{correlationID, replyToDestination, EXCHANGERS.size(), message});
+ correlationID, replyToDestination, EXCHANGERS.size(), message);
}
} catch (Exception e) {
LOG.warn("Unable to exchange message: {}. This exception is ignored.", message, e);
@@ -135,6 +136,17 @@ public class InOutProducer extends SjmsProducer {
}
@Override
+ public boolean validateObject(MessageConsumerResources obj) {
+ try {
+ obj.getSession().getAcknowledgeMode();
+ return true;
+ } catch (JMSException ex) {
+ LOG.error("Cannot validate session", ex);
+ }
+ return false;
+ }
+
+ @Override
public void destroyObject(MessageConsumerResources model) throws Exception {
if (model.getMessageConsumer() != null) {
model.getMessageConsumer().close();
@@ -173,6 +185,7 @@ public class InOutProducer extends SjmsProducer {
consumers = new GenericObjectPool<>(new MessageConsumerResourcesFactory());
consumers.setMaxActive(getConsumerCount());
consumers.setMaxIdle(getConsumerCount());
+ consumers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
while (consumers.getNumIdle() < consumers.getMaxIdle()) {
consumers.addObject();
}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java
new file mode 100644
index 0000000..e5e4276
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectConsumerTest extends JmsTestSupport {
+
+ private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer";
+ private static final String MOCK_RESULT = "mock:result";
+
+ @Test
+ public void testSynchronous() throws Exception {
+ final String expectedBody = "Hello World";
+ MockEndpoint mock = getMockEndpoint(MOCK_RESULT);
+ mock.expectedMessageCount(2);
+ mock.expectedBodiesReceived(expectedBody, expectedBody);
+
+ template.sendBody(SJMS_QUEUE_NAME, expectedBody);
+
+ reconnect();
+
+ template.sendBody(SJMS_QUEUE_NAME, expectedBody);
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(SJMS_QUEUE_NAME).to(MOCK_RESULT);
+ }
+ };
+ }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java
new file mode 100644
index 0000000..02bb561
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectInOutProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testInOutQueueProducer() throws Exception {
+ MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request");
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ Object responseObject = template.requestBody("direct:start", requestText);
+ assertNotNull(responseObject);
+ assertTrue(responseObject instanceof String);
+ assertEquals(responseText, responseObject);
+ mc.close();
+
+ reconnect();
+
+ mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request");
+ assertNotNull(mc);
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+ responseObject = template.requestBody("direct:start", requestText);
+ assertNotNull(responseObject);
+ assertTrue(responseObject instanceof String);
+ assertEquals(responseText, responseObject);
+ mc.close();
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("log:" + TEST_DESTINATION_NAME + ".in.log.1?showBody=true")
+ .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+ + TEST_DESTINATION_NAME + ".response")
+ .to("log:" + TEST_DESTINATION_NAME + ".out.log.1?showBody=true");
+ }
+ };
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ camelContext.getComponent("sjms", SjmsComponent.class).setConnectionTestOnBorrow(true);
+ return camelContext;
+ }
+
+ protected class MyMessageListener implements MessageListener {
+ private String requestText;
+ private String responseText;
+
+ public MyMessageListener(String request, String response) {
+ this.requestText = request;
+ this.responseText = response;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ TextMessage request = (TextMessage)message;
+ assertNotNull(request);
+ String text = request.getText();
+ assertEquals(requestText, text);
+
+ TextMessage response = getSession().createTextMessage();
+ response.setText(responseText);
+ response.setJMSCorrelationID(request.getJMSCorrelationID());
+ MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+ mp.send(response);
+ mp.close();
+ } catch (JMSException e) {
+ fail(e.getLocalizedMessage());
+ }
+ }
+ }
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java
new file mode 100644
index 0000000..b88a73d
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectProducerTest extends JmsTestSupport {
+
+ private static final String TEST_DESTINATION_NAME = "sync.queue.producer.test";
+
+ @Override
+ protected boolean useJmx() {
+ return false;
+ }
+
+ @Test
+ public void testInOnlyQueueProducer() throws Exception {
+ MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME);
+ assertNotNull(mc);
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(2);
+ mock.expectedBodiesReceived(expectedBody, expectedBody);
+
+ template.sendBody("direct:start", expectedBody);
+ Message message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+ template.sendBody("direct:finish", text);
+
+ reconnect(10000);
+
+ mc = createQueueConsumer(TEST_DESTINATION_NAME);
+ template.sendBody("direct:start", expectedBody);
+ message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+
+ tm = (TextMessage) message;
+ text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
+
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .to("sjms:queue:" + TEST_DESTINATION_NAME + "?consumerCount=10");
+
+ from("direct:finish")
+ .to("log:test.log.1?showBody=true", "mock:result");
+ }
+ };
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ camelContext.getComponent("sjms", SjmsComponent.class).setConnectionTestOnBorrow(true);
+ return camelContext;
+ }
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
index 588e645..eb35ab4 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
@@ -73,7 +73,7 @@ public class JmsTestSupport extends CamelTestSupport {
String host;
try (InputStream inStream = url.openStream()) {
properties.load(inStream);
- if (Boolean.valueOf(properties.getProperty("amq.external"))) {
+ if (Boolean.parseBoolean(properties.getProperty("amq.external"))) {
log.info("Using external AMQ");
port = Integer.parseInt(properties.getProperty("amq.port"));
host = properties.getProperty("amq.host");
@@ -181,4 +181,27 @@ public class JmsTestSupport extends CamelTestSupport {
public MessageConsumer createTopicConsumer(String destination, String messageSelector) throws Exception {
return new Jms11ObjectFactory().createMessageConsumer(session, destinationCreationStrategy.createDestination(session, destination, true), messageSelector, true, null, true, false);
}
+
+ public void reconnect() throws Exception {
+ reconnect(0);
+ }
+
+ public void reconnect(int waitingMillis) throws Exception {
+ log.info("Closing JMS Session");
+ getSession().close();
+ log.info("Closing JMS Connection");
+ connection.stop();
+ log.info("Stopping the ActiveMQ Broker");
+ broker.stop();
+ broker.waitUntilStopped();
+ Thread.sleep(waitingMillis);
+ broker.start(true);
+ broker.waitUntilStarted();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri);
+ setupFactoryExternal(connectionFactory);
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
}