You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/10 11:24:51 UTC
[camel] 03/03: CAMEL-14685 - Fixed CS
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6a0814561713f451328e74f5be6d7af12bc43811
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Mar 10 12:23:39 2020 +0100
CAMEL-14685 - Fixed CS
---
.../camel/component/rabbitmq/RabbitConsumer.java | 23 ++++----
.../component/rabbitmq/RabbitMQComponent.java | 36 ++++++------
.../rabbitmq/RabbitMQConnectionFactorySupport.java | 2 +-
.../component/rabbitmq/RabbitMQConstants.java | 3 +-
.../camel/component/rabbitmq/RabbitMQConsumer.java | 6 +-
.../component/rabbitmq/RabbitMQDeclareSupport.java | 10 ++--
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 13 ++---
.../rabbitmq/RabbitMQHeaderFilterStrategy.java | 4 +-
.../rabbitmq/RabbitMQMessageConverter.java | 52 ++++++++---------
.../rabbitmq/RabbitMQMessagePublisher.java | 25 +++++++--
.../camel/component/rabbitmq/RabbitMQProducer.java | 52 ++++++++++-------
.../rabbitmq/pool/PoolableChannelFactory.java | 4 +-
.../rabbitmq/reply/CorrelationTimeoutMap.java | 10 ++--
.../rabbitmq/reply/MessageSentCallback.java | 4 +-
.../rabbitmq/reply/QueueReplyHandler.java | 3 +-
.../component/rabbitmq/reply/ReplyHandler.java | 6 +-
.../component/rabbitmq/reply/ReplyHolder.java | 28 ++++++----
.../component/rabbitmq/reply/ReplyManager.java | 42 +++++++-------
.../rabbitmq/reply/ReplyManagerSupport.java | 65 ++++++++++++----------
.../rabbitmq/reply/TemporaryQueueReplyHandler.java | 9 +--
.../rabbitmq/reply/TemporaryQueueReplyManager.java | 19 +++----
...essageIdAsCorrelationIdMessageSentCallback.java | 8 ++-
.../component/rabbitmq/RabbitConsumerHangTest.java | 3 +-
.../component/rabbitmq/RabbitMQComponentTest.java | 2 +-
.../component/rabbitmq/RabbitMQConsumerTest.java | 2 +-
.../component/rabbitmq/RabbitMQEndpointTest.java | 4 +-
.../component/rabbitmq/RabbitMQProducerTest.java | 2 +-
.../integration/AbstractRabbitMQIntTest.java | 8 +--
.../rabbitmq/integration/DockerTestUtils.java | 18 +++---
.../rabbitmq/integration/RabbitMQBasicIntTest.java | 12 +---
.../integration/RabbitMQConsumerIntTest.java | 14 ++---
.../RabbitMQConsumerIntTestReplyTo.java | 29 ++++------
.../RabbitMQDeadLetterRoutingKeyIntTest.java | 44 +++++----------
.../rabbitmq/integration/RabbitMQInOutIntTest.java | 33 +++++------
.../rabbitmq/integration/RabbitMQLoadIntTest.java | 25 +++------
.../integration/RabbitMQProducerIntTest.java | 18 +++---
.../integration/RabbitMQReConnectionIntTest.java | 40 ++++++-------
.../RabbitMQRequeueHandledExceptionIntTest.java | 22 ++------
.../integration/RabbitMQRequeueIntTest.java | 44 ++++++---------
.../RabbitMQRequeueUnhandledExceptionIntTest.java | 22 ++------
.../integration/spring/RabbitMQSpringIntTest.java | 5 +-
.../qpid/RabbitMQConsumerQpidTestReplyTo.java | 4 +-
.../rabbitmq/qpid/RabbitMQProducerQpidTest.java | 4 +-
.../rabbitmq/qpid/RabbitMQRequeueQpidTest.java | 4 +-
.../testbeans/TestNonSerializableObject.java | 2 +-
.../testbeans/TestPartiallySerializableObject.java | 2 +-
.../rabbitmq/testbeans/TestSerializableObject.java | 2 +-
47 files changed, 373 insertions(+), 416 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 9930e86..2d096ee 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -68,7 +68,8 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
if (!consumer.getEndpoint().isAutoAck()) {
lock.acquire();
}
- //Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
+ // Channel might be open because while we were waiting for the lock,
+ // stop() has been succesfully called.
if (!channel.isOpen()) {
// we could not open the channel so release the lock
if (!consumer.getEndpoint().isAutoAck()) {
@@ -181,7 +182,8 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
if (channel == null) {
throw new IOException("The RabbitMQ channel is not open");
}
- tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), consumer.getEndpoint().getConsumerTag(), false, consumer.getEndpoint().isExclusiveConsumer(), null, this);
+ tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), consumer.getEndpoint().getConsumerTag(), false,
+ consumer.getEndpoint().isExclusiveConsumer(), null, this);
}
@Override
@@ -230,8 +232,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
*
- * @param consumerTag
- * the defined consumer tag (client- or server-generated)
+ * @param consumerTag the defined consumer tag (client- or server-generated)
*/
@Override
public void handleCancelOk(String consumerTag) {
@@ -242,8 +243,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
*
- * @param consumerTag
- * the defined consumer tag (client- or server-generated)
+ * @param consumerTag the defined consumer tag (client- or server-generated)
*/
@Override
public void handleCancel(String consumerTag) throws IOException {
@@ -252,7 +252,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
try {
channel.basicCancel(tag);
} catch (Exception e) {
- //no-op
+ // no-op
}
this.consumer.getEndpoint().declareExchangeAndQueue(channel);
@@ -285,8 +285,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
LOG.debug(e.getMessage(), e);
Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
- final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0
- ? networkRecoveryInterval : 100L;
+ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
try {
Thread.sleep(connectionRetryInterval);
} catch (InterruptedException e1) {
@@ -329,8 +328,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
}
private boolean isAutomaticRecoveryEnabled() {
- return this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != null
- && this.consumer.getEndpoint().getAutomaticRecoveryEnabled();
+ return this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != null && this.consumer.getEndpoint().getAutomaticRecoveryEnabled();
}
private boolean isChannelOpen() {
@@ -346,8 +344,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
LOG.debug("Created channel: {}", channel);
// setup the basicQos
if (consumer.getEndpoint().isPrefetchEnabled()) {
- channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(),
- consumer.getEndpoint().isPrefetchGlobal());
+ channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(), consumer.getEndpoint().isPrefetchGlobal());
}
// This really only needs to be called on the first consumer or on
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 987abc1..8ac459e 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -148,9 +148,7 @@ public class RabbitMQComponent extends DefaultComponent {
}
@Override
- protected RabbitMQEndpoint createEndpoint(String uri,
- String remaining,
- Map<String, Object> params) throws Exception {
+ protected RabbitMQEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
String host = getHostname();
int port = getPortNumber();
@@ -175,7 +173,8 @@ public class RabbitMQComponent extends DefaultComponent {
// ConnectionFactory reference
ConnectionFactory connectionFactory = resolveAndRemoveReferenceParameter(params, "connectionFactory", ConnectionFactory.class, getConnectionFactory());
- // try to lookup if there is a single instance in the registry of the ConnectionFactory
+ // try to lookup if there is a single instance in the registry of the
+ // ConnectionFactory
if (connectionFactory == null && isAutoDetectConnectionFactory()) {
Map<String, ConnectionFactory> map = getCamelContext().getRegistry().findByTypeWithName(ConnectionFactory.class);
if (map != null && map.size() == 1) {
@@ -249,7 +248,7 @@ public class RabbitMQComponent extends DefaultComponent {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating RabbitMQEndpoint with host {}:{} and exchangeName: {}",
- new Object[]{endpoint.getHostname(), endpoint.getPortNumber(), endpoint.getExchangeName()});
+ new Object[] {endpoint.getHostname(), endpoint.getPortNumber(), endpoint.getExchangeName()});
}
Map<String, Object> localArgs = new HashMap<>();
@@ -369,9 +368,10 @@ public class RabbitMQComponent extends DefaultComponent {
}
/**
- * Whether to auto-detect looking up RabbitMQ connection factory from the registry.
- * When enabled and a single instance of the connection factory is found then it will be used.
- * An explicit connection factory can be configured on the component or endpoint level which takes precedence.
+ * Whether to auto-detect looking up RabbitMQ connection factory from the
+ * registry. When enabled and a single instance of the connection factory is
+ * found then it will be used. An explicit connection factory can be
+ * configured on the component or endpoint level which takes precedence.
*/
public void setAutoDetectConnectionFactory(boolean autoDetectConnectionFactory) {
this.autoDetectConnectionFactory = autoDetectConnectionFactory;
@@ -570,7 +570,8 @@ public class RabbitMQComponent extends DefaultComponent {
/**
* When true, the message will be published with
- * <a href="https://www.rabbitmq.com/confirms.html">publisher acknowledgements</a> turned on
+ * <a href="https://www.rabbitmq.com/confirms.html">publisher
+ * acknowledgements</a> turned on
*/
public boolean isPublisherAcknowledgements() {
return publisherAcknowledgements;
@@ -595,9 +596,9 @@ public class RabbitMQComponent extends DefaultComponent {
/**
* When true, an exception will be thrown when the message cannot be
* delivered (basic.return) and the message is marked as mandatory.
- * PublisherAcknowledgement will also be activated in this case.
- * See also <a href=https://www.rabbitmq.com/confirms.html">publisher acknowledgements</a>
- * - When will messages be confirmed.
+ * PublisherAcknowledgement will also be activated in this case. See also <a
+ * href=https://www.rabbitmq.com/confirms.html">publisher
+ * acknowledgements</a> - When will messages be confirmed.
*/
public boolean isGuaranteedDeliveries() {
return guaranteedDeliveries;
@@ -664,7 +665,8 @@ public class RabbitMQComponent extends DefaultComponent {
}
/**
- * Connection client properties (client info used in negotiating with the server)
+ * Connection client properties (client info used in negotiating with the
+ * server)
*/
public void setClientProperties(Map<String, Object> clientProperties) {
this.clientProperties = clientProperties;
@@ -686,7 +688,8 @@ public class RabbitMQComponent extends DefaultComponent {
}
/**
- * Configure SSL trust manager, SSL should be enabled for this option to be effective
+ * Configure SSL trust manager, SSL should be enabled for this option to be
+ * effective
*/
public void setTrustManager(TrustManager trustManager) {
this.trustManager = trustManager;
@@ -743,8 +746,9 @@ public class RabbitMQComponent extends DefaultComponent {
}
/**
- * Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
- * when you want a long-lived shared queue to be temporarily accessible by just one consumer.
+ * Request exclusive access to the queue (meaning only this consumer can
+ * access the queue). This is useful when you want a long-lived shared queue
+ * to be temporarily accessible by just one consumer.
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
index 8e1e3c3..07b83b8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
@@ -22,7 +22,7 @@ import java.security.NoSuchAlgorithmException;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQConnectionFactorySupport {
-
+
public ConnectionFactory createFactoryFor(final RabbitMQEndpoint endpoint) {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(endpoint.getUsername());
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index 4e4aacf..1d26f9a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -49,9 +49,8 @@ public final class RabbitMQConstants {
public static final String RABBITMQ_QUEUE_MAX_PRIORITY_KEY = "x-max-priority";
public static final String RABBITMQ_QUEUE_MESSAGE_TTL_KEY = "x-message-ttl";
public static final String RABBITMQ_QUEUE_TTL_KEY = "x-expires";
-
private RabbitMQConstants() {
- //Constants class
+ // Constants class
}
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 6519214..89851c3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -87,9 +87,9 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
}
private boolean isAutomaticRecoveryEnabled() {
- return this.endpoint.getAutomaticRecoveryEnabled() != null
- && this.endpoint.getAutomaticRecoveryEnabled();
+ return this.endpoint.getAutomaticRecoveryEnabled() != null && this.endpoint.getAutomaticRecoveryEnabled();
}
+
/**
* Create the consumers but don't start yet
*/
@@ -194,8 +194,6 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
}
}
-
-
/**
* Task in charge of opening connection and adding listener when consumer is
* started and broker is not available.
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
index e2c3805..23324a2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -68,22 +68,22 @@ public class RabbitMQDeclareSupport {
// some arguments must be in numeric values so we need to fix this
Object queueLengthLimit = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_LENGTH_LIMIT_KEY);
if (queueLengthLimit instanceof String) {
- queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_LENGTH_LIMIT_KEY, Long.parseLong((String) queueLengthLimit));
+ queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_LENGTH_LIMIT_KEY, Long.parseLong((String)queueLengthLimit));
}
-
+
Object queueMaxPriority = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_MAX_PRIORITY_KEY);
if (queueMaxPriority instanceof String) {
- queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_MAX_PRIORITY_KEY, Integer.parseInt((String) queueMaxPriority));
+ queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_MAX_PRIORITY_KEY, Integer.parseInt((String)queueMaxPriority));
}
Object queueMessageTtl = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_MESSAGE_TTL_KEY);
if (queueMessageTtl instanceof String) {
- queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_MESSAGE_TTL_KEY, Long.parseLong((String) queueMessageTtl));
+ queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_MESSAGE_TTL_KEY, Long.parseLong((String)queueMessageTtl));
}
Object queueExpiration = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_TTL_KEY);
if (queueExpiration instanceof String) {
- queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_TTL_KEY, Long.parseLong((String) queueExpiration));
+ queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_TTL_KEY, Long.parseLong((String)queueExpiration));
}
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 6c99eca..85202b1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -965,16 +965,15 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
this.consumerTag = consumerTag;
}
- public boolean isAllowCustomHeaders() {
- return allowCustomHeaders;
- }
+ public boolean isAllowCustomHeaders() {
+ return allowCustomHeaders;
+ }
/**
* Allow pass custom values to header
*/
- public void setAllowCustomHeaders(boolean allowCustomHeaders) {
- this.allowCustomHeaders = allowCustomHeaders;
- }
-
+ public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+ this.allowCustomHeaders = allowCustomHeaders;
+ }
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
index c7893ab..de6d169 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
@@ -18,8 +18,8 @@ package org.apache.camel.component.rabbitmq;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
-public class RabbitMQHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
-
+public class RabbitMQHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
public RabbitMQHeaderFilterStrategy() {
initialize();
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 78fa302..3db1066 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -42,9 +42,10 @@ public class RabbitMQMessageConverter {
private boolean allowNullHeaders;
private boolean allowCustomHeaders;
private final HeaderFilterStrategy headerFilterStrategy = new RabbitMQHeaderFilterStrategy();
-
+
/**
- * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+ * Will take an {@link Exchange} and add header values back to the
+ * {@link Exchange#getIn()}
*/
public void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
@@ -169,27 +170,27 @@ public class RabbitMQMessageConverter {
for (Map.Entry<String, Object> header : headers.entrySet()) {
// filter header values.
- Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
-
- // additionaly filter out the OVERRIDE header so it does not propagate
+ Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
+
+ // additionaly filter out the OVERRIDE header so it does not
+ // propagate
if ((value != null || isAllowNullHeaders()) && !header.getKey().equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME)) {
- boolean filteredHeader;
- if (!allowCustomHeaders) {
- filteredHeader = headerFilterStrategy.applyFilterToCamelHeaders(header.getKey(), header.getValue(), exchange);
- if (filteredHeader) {
- filteredHeaders.put(header.getKey(), header.getValue());
- }
- } else {
- filteredHeaders.put(header.getKey(), header.getValue());
- }
+ boolean filteredHeader;
+ if (!allowCustomHeaders) {
+ filteredHeader = headerFilterStrategy.applyFilterToCamelHeaders(header.getKey(), header.getValue(), exchange);
+ if (filteredHeader) {
+ filteredHeaders.put(header.getKey(), header.getValue());
+ }
+ } else {
+ filteredHeaders.put(header.getKey(), header.getValue());
+ }
} else if (LOG.isDebugEnabled()) {
if (header.getValue() == null) {
LOG.debug("Ignoring header: {} with null value", header.getKey());
} else if (header.getKey().equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME)) {
LOG.debug("Preventing header propagation: {} with value {}:", header.getKey(), header.getValue());
} else {
- LOG.debug("Ignoring header: {} of class: {} with value: {}",
- header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue());
+ LOG.debug("Ignoring header: {} of class: {} with value: {}", header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue());
}
}
}
@@ -254,7 +255,7 @@ public class RabbitMQMessageConverter {
if (out) {
// use OUT message
message = camelExchange.getOut();
- } else {
+ } else {
if (camelExchange.getIn() != null) {
// Use the existing message so we keep the headers
message = camelExchange.getIn();
@@ -305,8 +306,7 @@ public class RabbitMQMessageConverter {
private void deserializeBody(final Exchange camelExchange, final Message message, final byte[] body) {
Object messageBody = null;
- try (InputStream b = new ByteArrayInputStream(body);
- ObjectInputStream o = new ObjectInputStream(b)) {
+ try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
messageBody = o.readObject();
} catch (IOException | ClassNotFoundException e) {
LOG.warn("Could not deserialize the object");
@@ -314,7 +314,7 @@ public class RabbitMQMessageConverter {
}
if (messageBody instanceof Throwable) {
LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
- camelExchange.setException((Throwable) messageBody);
+ camelExchange.setException((Throwable)messageBody);
} else {
message.setBody(messageBody);
}
@@ -340,11 +340,11 @@ public class RabbitMQMessageConverter {
this.allowNullHeaders = allowNullHeaders;
}
- public boolean isAllowCustomHeaders() {
- return allowCustomHeaders;
- }
+ public boolean isAllowCustomHeaders() {
+ return allowCustomHeaders;
+ }
- public void setAllowCustomHeaders(boolean allowCustomHeaders) {
- this.allowCustomHeaders = allowCustomHeaders;
- }
+ public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+ this.allowCustomHeaders = allowCustomHeaders;
+ }
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 02d08b1..1b539d1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -70,8 +70,18 @@ public class RabbitMQMessagePublisher {
message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
}
if (routingKey != null && routingKey.startsWith(RabbitMQConstants.RABBITMQ_DIRECT_REPLY_ROUTING_KEY)) {
- message.setHeader(RabbitMQConstants.EXCHANGE_NAME, RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use default exchange for reply-to messages
- message.setHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use default exchange for reply-to messages
+ message.setHeader(RabbitMQConstants.EXCHANGE_NAME, RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use
+ // default
+ // exchange
+ // for
+ // reply-to
+ // messages
+ message.setHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use
+ // default
+ // exchange
+ // for
+ // reply-to
+ // messages
}
return message;
@@ -81,13 +91,15 @@ public class RabbitMQMessagePublisher {
AMQP.BasicProperties properties;
byte[] body;
try {
- // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
+ // To maintain backwards compatibility try the TypeConverter (The
+ // DefaultTypeConverter seems to only work on Strings)
body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, message.getBody());
properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
} catch (NoTypeConversionAvailableException | TypeConversionException e) {
if (message.getBody() instanceof Serializable) {
- // Add the header so the reply processor knows to de-serialize it
+ // Add the header so the reply processor knows to de-serialize
+ // it
message.getHeaders().put(RabbitMQEndpoint.SERIALIZE_HEADER, true);
properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
body = serializeBodyFrom(message);
@@ -104,8 +116,9 @@ public class RabbitMQMessagePublisher {
}
private void publishToRabbitMQ(final AMQP.BasicProperties properties, final byte[] body) throws IOException {
- String exchangeName = (String) message.getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
- // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_OVERRIDE_NAME
+ String exchangeName = (String)message.getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+ // If it is BridgeEndpoint we should ignore the message header of
+ // EXCHANGE_OVERRIDE_NAME
if (exchangeName == null || endpoint.isBridgeEndpoint()) {
exchangeName = endpoint.getExchangeName();
} else {
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 589d511..8e8c1f5 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -60,7 +60,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
@Override
public RabbitMQEndpoint getEndpoint() {
- return (RabbitMQEndpoint) super.getEndpoint();
+ return (RabbitMQEndpoint)super.getEndpoint();
}
/**
@@ -71,7 +71,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
}
/**
- * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
+ * Do something with a pooled channel (similar to Spring JDBC
+ * TransactionTemplate#execute)
*/
private <T> T execute(ChannelCallback<T> callback) throws Exception {
Channel channel;
@@ -100,6 +101,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
/**
* Open connection and initialize channel pool
+ *
* @throws Exception
*/
private synchronized void openConnectionAndChannelPool() throws Exception {
@@ -108,8 +110,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
LOG.debug("Created connection: {}", conn);
LOG.trace("Creating channel pool...");
- channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(),
- GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getEndpoint().getChannelPoolMaxWait());
+ channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
+ getEndpoint().getChannelPoolMaxWait());
attemptDeclaration();
}
@@ -127,6 +129,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
/**
* This will reconnect only if the connection is closed.
+ *
* @throws Exception
*/
private synchronized void checkConnectionAndChannelPool() throws Exception {
@@ -153,6 +156,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
/**
* If needed, close Connection and Channel
+ *
* @throws IOException
*/
private synchronized void closeConnectionAndChannel() throws IOException {
@@ -215,7 +219,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
initReplyManager();
- // the request timeout can be overruled by a header otherwise the endpoint configured value is used
+ // the request timeout can be overruled by a header otherwise the
+ // endpoint configured value is used
final long timeout = exchange.getIn().getHeader(RabbitMQConstants.REQUEST_TIMEOUT, getEndpoint().getRequestTimeout(), long.class);
final String originalCorrelationId = in.getHeader(RabbitMQConstants.CORRELATIONID, String.class);
@@ -226,8 +231,9 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
in.setHeader(RabbitMQConstants.REPLY_TO, replyManager.getReplyTo());
- String exchangeName = (String) exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
- // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_OVERRIDE_NAME
+ String exchangeName = (String)exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+ // If it is BridgeEndpoint we should ignore the message header of
+ // EXCHANGE_OVERRIDE_NAME
if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
exchangeName = getEndpoint().getExchangeName();
} else {
@@ -235,7 +241,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
}
String key = in.getHeader(RabbitMQConstants.ROUTING_KEY, String.class);
- // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
+ // we just need to make sure RoutingKey option take effect if it is not
+ // BridgeEndpoint
if (key == null || getEndpoint().isBridgeEndpoint()) {
key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
}
@@ -252,13 +259,15 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
exchange.setException(e);
return true;
}
- // continue routing asynchronously (reply will be processed async when its received)
+ // continue routing asynchronously (reply will be processed async when
+ // its received)
return false;
}
private boolean processInOnly(Exchange exchange, AsyncCallback callback) throws Exception {
- String exchangeName = (String) exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
- // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_OVERRIDE_NAME
+ String exchangeName = (String)exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+ // If it is BridgeEndpoint we should ignore the message header of
+ // EXCHANGE_OVERRIDE_NAME
if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
exchangeName = getEndpoint().getExchangeName();
} else {
@@ -266,7 +275,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
}
String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, String.class);
- // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
+ // we just need to make sure RoutingKey option take effect if it is not
+ // BridgeEndpoint
if (key == null || getEndpoint().isBridgeEndpoint()) {
key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
}
@@ -315,8 +325,10 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
return;
}
LOG.debug("Starting reply manager");
- // must use the classloader from the application context when creating reply manager,
- // as it should inherit the classloader from app context and not the current which may be
+ // must use the classloader from the application context when
+ // creating reply manager,
+ // as it should inherit the classloader from app context and not
+ // the current which may be
// a different classloader
ClassLoader current = Thread.currentThread().getContextClassLoader();
ClassLoader ac = getEndpoint().getCamelContext().getApplicationContextClassLoader();
@@ -324,12 +336,14 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
if (ac != null) {
Thread.currentThread().setContextClassLoader(ac);
}
- // validate that replyToType and replyTo is configured accordingly
+ // validate that replyToType and replyTo is configured
+ // accordingly
if (getEndpoint().getReplyToType() != null) {
- // setting temporary with a fixed replyTo is not supported
+ // setting temporary with a fixed replyTo is not
+ // supported
if (getEndpoint().getReplyTo() != null && getEndpoint().getReplyToType().equals(ReplyToType.Temporary.name())) {
- throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary
- + " is not supported when replyTo " + getEndpoint().getReplyTo() + " is also configured.");
+ throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary + " is not supported when replyTo " + getEndpoint().getReplyTo()
+ + " is also configured.");
}
}
@@ -355,7 +369,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
if (replyManager != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping RabbitMQReplyManager: {} from processing replies from: {}", replyManager,
- getEndpoint().getReplyTo() != null ? getEndpoint().getReplyTo() : "temporary queue");
+ getEndpoint().getReplyTo() != null ? getEndpoint().getReplyTo() : "temporary queue");
}
ServiceHelper.stopService(replyManager);
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index b18b6dc..2f5e19a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -33,7 +33,7 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
public PoolableChannelFactory(Connection connection) {
this.connection = connection;
}
-
+
@Override
public Channel makeObject() throws Exception {
return connection.createChannel();
@@ -44,7 +44,7 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
try {
t.close();
} catch (Exception e) {
- //no-op
+ // no-op
}
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
index 3bc0bef..0a3a7dd 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
@@ -23,9 +23,9 @@ import org.apache.camel.support.DefaultTimeoutMap;
import static org.apache.camel.TimeoutMap.Listener.Type.*;
/**
- * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which
- * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to
- * timeout as well.
+ * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages
+ * which has been timed out, and thus should trigger the waiting
+ * {@link org.apache.camel.Exchange} to timeout as well.
*/
class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
@@ -35,7 +35,9 @@ class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
}
private static long encode(long timeoutMillis) {
- return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE; // TODO why not Long.MAX_VALUE!
+ return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE; // TODO
+ // why not
+ // Long.MAX_VALUE!
}
private void listener(Listener.Type type, String key, ReplyHandler handler) {
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
index 83b11ea..aecbd64 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
@@ -26,8 +26,8 @@ public interface MessageSentCallback {
/**
* Callback when the message has been sent.
*
- * @param session the session
- * @param message the message
+ * @param session the session
+ * @param message the message
* @param destination the destination
*/
void sent(Connection session, byte[] message, String destination);
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
index 06d5e61..884698a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
@@ -24,8 +24,7 @@ import org.apache.camel.Exchange;
*/
public class QueueReplyHandler extends TemporaryQueueReplyHandler {
- public QueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long timeout) {
+ public QueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long timeout) {
super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
index 0ceaf74..40c6a4a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
@@ -26,15 +26,15 @@ public interface ReplyHandler {
/**
* The reply message was received
*
- * @param correlationId the correlation id
- * @param reply the reply message
+ * @param correlationId the correlation id
+ * @param reply the reply message
*/
void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply);
/**
* The reply message was not received and a timeout triggered
*
- * @param correlationId the correlation id
+ * @param correlationId the correlation id
*/
void onTimeout(String correlationId);
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
index cb3caa7..cdc481e 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
@@ -21,8 +21,10 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
/**
- * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
- * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
+ * Holder which contains the {@link Exchange} and
+ * {@link org.apache.camel.AsyncCallback} to be used when the reply arrives, so
+ * we can set the reply on the {@link Exchange} and continue routing using the
+ * callback.
*/
public class ReplyHolder {
@@ -37,8 +39,7 @@ public class ReplyHolder {
/**
* Constructor to use when a reply message was received
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
- String correlationId, AMQP.BasicProperties properties, byte[] message) {
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, AMQP.BasicProperties properties, byte[] message) {
this.exchange = exchange;
this.callback = callback;
this.originalCorrelationId = originalCorrelationId;
@@ -50,8 +51,7 @@ public class ReplyHolder {
/**
* Constructor to use when a timeout occurred
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
- String correlationId, long timeout) {
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long timeout) {
this(exchange, callback, originalCorrelationId, correlationId, null, null);
this.timeout = timeout;
}
@@ -65,10 +65,12 @@ public class ReplyHolder {
}
/**
- * Gets the original correlation id, if one was set when sending the message.
+ * Gets the original correlation id, if one was set when sending the
+ * message.
* <p/>
- * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id.
- * So we need to remember it so we can restore the correlation id.
+ * Some JMS brokers will mess with the correlation id and send back a
+ * different/empty correlation id. So we need to remember it so we can
+ * restore the correlation id.
*/
public String getOriginalCorrelationId() {
return originalCorrelationId;
@@ -86,7 +88,8 @@ public class ReplyHolder {
/**
* Gets the received message
*
- * @return the received message, or <tt>null</tt> if timeout occurred and no message has been received
+ * @return the received message, or <tt>null</tt> if timeout occurred and no
+ * message has been received
* @see #isTimeout()
*/
public byte[] getMessage() {
@@ -96,8 +99,9 @@ public class ReplyHolder {
/**
* Whether timeout triggered or not.
* <p/>
- * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been
- * received within that time frame.
+ * A timeout is triggered if <tt>requestTimeout</tt> option has been
+ * configured, and a reply message has <b>not</b> been received within that
+ * time frame.
*/
public boolean isTimeout() {
return message == null;
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
index f30d988..f141465 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -23,8 +23,9 @@ import org.apache.camel.Exchange;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
/**
- * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
- * over RabbitMQ.
+ * The {@link ReplyManager} is responsible for handling
+ * <a href="http://camel.apache.org/request-reply.html">request-reply</a> over
+ * RabbitMQ.
*/
public interface ReplyManager {
@@ -32,7 +33,7 @@ public interface ReplyManager {
* Sets the belonging {@link RabbitMQEndpoint}
*/
void setEndpoint(RabbitMQEndpoint endpoint);
-
+
/**
* Sets the reply to queue the manager should listen for replies.
* <p/>
@@ -48,40 +49,41 @@ public interface ReplyManager {
/**
* Register a reply
*
- * @param replyManager the reply manager being used
- * @param exchange the exchange
- * @param callback the callback
- * @param originalCorrelationId an optional original correlation id
- * @param correlationId the correlation id to expect being used
- * @param requestTimeout the timeout
+ * @param replyManager the reply manager being used
+ * @param exchange the exchange
+ * @param callback the callback
+ * @param originalCorrelationId an optional original correlation id
+ * @param correlationId the correlation id to expect being used
+ * @param requestTimeout the timeout
* @return the correlation id used
*/
- String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long requestTimeout);
+ String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout);
/**
- * Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
+ * Sets the scheduled to use when checking for timeouts (no reply received
+ * within a given time period)
*/
void setScheduledExecutorService(ScheduledExecutorService executorService);
/**
* Updates the correlation id to the new correlation id.
* <p/>
- * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a
- * provisional correlation id is first used, then after the message has been sent, the real
- * correlation id is known. This allows us then to update the internal mapping to expect the
- * real correlation id.
+ * This is only used when <tt>useMessageIDasCorrelationID</tt> option is
+ * used, which means a provisional correlation id is first used, then after
+ * the message has been sent, the real correlation id is known. This allows
+ * us then to update the internal mapping to expect the real correlation id.
*
- * @param correlationId the provisional correlation id
- * @param newCorrelationId the real correlation id
- * @param requestTimeout the timeout
+ * @param correlationId the provisional correlation id
+ * @param newCorrelationId the real correlation id
+ * @param requestTimeout the timeout
*/
void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
/**
* Process the reply
*
- * @param holder containing needed data to process the reply and continue routing
+ * @param holder containing needed data to process the reply and continue
+ * routing
*/
void processReply(ReplyHolder holder);
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 02320ac..173d729 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -45,14 +45,14 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
protected final CamelContext camelContext;
protected final CountDownLatch replyToLatch = new CountDownLatch(1);
protected final long replyToTimeout = 1000;
-
+
protected ScheduledExecutorService executorService;
protected RabbitMQEndpoint endpoint;
protected String replyTo;
protected Connection listenerContainer;
protected TimeoutMap<String, ReplyHandler> correlation;
-
+
private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
public ReplyManagerSupport(CamelContext camelContext) {
@@ -83,9 +83,12 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
return replyTo;
}
try {
- // the reply to destination has to be resolved using a DestinationResolver using
- // the MessageListenerContainer which occurs asynchronously so we have to wait
- // for that to happen before we can retrieve the reply to destination to be used
+ // the reply to destination has to be resolved using a
+ // DestinationResolver using
+ // the MessageListenerContainer which occurs asynchronously so we
+ // have to wait
+ // for that to happen before we can retrieve the reply to
+ // destination to be used
LOG.trace("Waiting for replyTo to be set");
boolean done = replyToLatch.await(replyToTimeout, TimeUnit.MILLISECONDS);
if (!done) {
@@ -100,11 +103,9 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
}
@Override
- public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long requestTimeout) {
+ public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) {
// add to correlation map
- QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback,
- originalCorrelationId, correlationId, requestTimeout);
+ QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
// Just make sure we don't override the old value of the correlationId
ReplyHandler result = correlation.putIfAbsent(correlationId, handler, requestTimeout);
if (result != null) {
@@ -114,9 +115,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
return correlationId;
}
- protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long requestTimeout);
-
+ protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId,
+ long requestTimeout);
@Override
public void cancelCorrelationId(String correlationId) {
@@ -149,21 +149,23 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
boolean timeout = holder.isTimeout();
if (timeout) {
- // timeout occurred do a WARN log so its easier to spot in the logs
+ // timeout occurred do a WARN log so its easier to spot in
+ // the logs
if (LOG.isWarnEnabled()) {
LOG.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
- + " Setting ExchangeTimedOutException on {} and continue routing.",
- holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange));
+ + " Setting ExchangeTimedOutException on {} and continue routing.", holder.getRequestTimeout(), holder.getCorrelationId(), replyTo,
+ ExchangeHelper.logIds(exchange));
}
// no response, so lets set a timed out exception
String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
} else {
-
+
messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true);
- // restore correlation id in case the remote server messed with it
+ // restore correlation id in case the remote server messed
+ // with it
if (holder.getOriginalCorrelationId() != null) {
if (exchange.hasOut()) {
exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
@@ -185,16 +187,21 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
protected abstract Connection createListenerContainer() throws Exception;
/**
- * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
- * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
- * to a remote broker, which always will be slower to send back reply, before Camel had a chance
- * to update it's internal correlation map.
+ * <b>IMPORTANT:</b> This logic is only being used due to high performance
+ * in-memory only testing using InOut over JMS. Its unlikely to happen in a
+ * real life situation with communication to a remote broker, which always
+ * will be slower to send back reply, before Camel had a chance to update
+ * it's internal correlation map.
*/
protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, byte[] message) {
- // race condition, when using messageID as correlationID then we store a provisional correlation id
- // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
- // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
- // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
+ // race condition, when using messageID as correlationID then we store a
+ // provisional correlation id
+ // at first, which gets updated with the JMSMessageID after the message
+ // has been sent. And in the unlikely
+ // event that the reply comes back really really fast, and the
+ // correlation map hasn't yet been updated
+ // from the provisional id to the JMSMessageID. If so we have to wait a
+ // bit and lookup again.
if (LOG.isWarnEnabled()) {
LOG.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
}
@@ -218,8 +225,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
if (answer != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
- correlationID, counter, answer);
+ LOG.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", correlationID, counter, answer);
}
}
}
@@ -233,7 +239,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
ObjectHelper.notNull(endpoint, "endpoint", this);
messageConverter.setAllowNullHeaders(endpoint.isAllowNullHeaders());
- // timeout map to use for purging messages which have timed out, while waiting for an expected reply
+ // timeout map to use for purging messages which have timed out, while
+ // waiting for an expected reply
// when doing request/reply over JMS
LOG.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval());
correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval());
@@ -241,7 +248,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
// create listener and start it
listenerContainer = createListenerContainer();
-
+
LOG.debug("Using executor {}", executorService);
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
index a334718..5021a33 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
@@ -23,7 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link ReplyHandler} to handle processing replies when using temporary queues.
+ * {@link ReplyHandler} to handle processing replies when using temporary
+ * queues.
*/
public class TemporaryQueueReplyHandler implements ReplyHandler {
@@ -33,13 +34,13 @@ public class TemporaryQueueReplyHandler implements ReplyHandler {
protected final ReplyManager replyManager;
protected final Exchange exchange;
protected final AsyncCallback callback;
- // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
+ // remember the original correlation id, in case the server returns back a
+ // reply with a messed up correlation id
protected final String originalCorrelationId;
protected final String correlationId;
protected final long timeout;
- public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long timeout) {
+ public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long timeout) {
this.replyManager = replyManager;
this.exchange = exchange;
this.originalCorrelationId = originalCorrelationId;
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index a9d959b..c97d54b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -44,8 +44,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
}
@Override
- protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, String correlationId, long requestTimeout) {
+ protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId,
+ long requestTimeout) {
return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
}
@@ -69,7 +69,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
correlation.remove(correlationID);
handler.onReply(correlationID, properties, message);
} else {
- // we could not correlate the received reply message to a matching request and therefore
+ // we could not correlate the received reply message to a matching
+ // request and therefore
// we cannot continue routing the unknown message
// log a warn and then ignore the message
LOG.warn("Reply received for unknown correlationID [{}]. The message will be ignored: {}", correlationID, message);
@@ -86,16 +87,15 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
Channel channel = conn.createChannel();
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
}
- //Let the server pick a random name for us
+ // Let the server pick a random name for us
DeclareOk result = channel.queueDeclare();
LOG.debug("Using temporary queue name: {}", result.getQueue());
setReplyTo(result.getQueue());
- //TODO check for the RabbitMQConstants.EXCHANGE_NAME header
+ // TODO check for the RabbitMQConstants.EXCHANGE_NAME header
channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
consumer = new RabbitConsumer(this, channel);
@@ -110,7 +110,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
consumer.stop();
}
- //TODO combine with class in RabbitMQConsumer
+ // TODO combine with class in RabbitMQConsumer
class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
private final TemporaryQueueReplyManager consumer;
@@ -130,8 +130,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
}
@Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
consumer.onMessage(properties, body);
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
index 68ae912..290df38 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
@@ -21,10 +21,12 @@ import java.util.UUID;
import com.rabbitmq.client.Connection;
/**
- * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>.
+ * Callback to be used when using the option
+ * <tt>useMessageIDAsCorrelationID</tt>.
* <p/>
- * This callback will keep the correlation registration in {@link ReplyManager} up-to-date with
- * the <tt>JMSMessageID</tt> which was assigned and used when the message was sent.
+ * This callback will keep the correlation registration in {@link ReplyManager}
+ * up-to-date with the <tt>JMSMessageID</tt> which was assigned and used when
+ * the message was sent.
*/
public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitConsumerHangTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitConsumerHangTest.java
index 4fc38ca..1cf2eb3 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitConsumerHangTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitConsumerHangTest.java
@@ -41,7 +41,8 @@ public class RabbitConsumerHangTest {
RabbitConsumer rabbitConsumer = new RabbitConsumer(consumer);
rabbitConsumer.handleDelivery(null, null, null, null);
- // will now fail with some NPE which is expected as we have not mocked all the inner details
+ // will now fail with some NPE which is expected as we have not mocked
+ // all the inner details
try {
rabbitConsumer.handleDelivery(null, null, null, null);
fail("Should have thrown NPE");
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 10183d9..044167a 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -99,7 +99,7 @@ public class RabbitMQComponentTest extends CamelTestSupport {
RabbitMQComponent comp = context.getComponent("rabbitmq", RabbitMQComponent.class);
comp.setAutoDetectConnectionFactory(false);
- return (RabbitMQEndpoint) comp.createEndpoint(uri, params);
+ return (RabbitMQEndpoint)comp.createEndpoint(uri, params);
}
@Test
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index e2881c7..4c9d8ca 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -45,7 +45,7 @@ public class RabbitMQConsumerTest {
public void testStoppingConsumerShutdownExecutor() throws Exception {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
- ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
+ ThreadPoolExecutor e = (ThreadPoolExecutor)Executors.newFixedThreadPool(3);
Mockito.when(endpoint.createExecutor()).thenReturn(e);
Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 2910593..3fdb624 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -120,7 +120,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string"));
customHeaders.put("timestampHeader", new Timestamp(4200));
customHeaders.put("byteHeader", Byte.valueOf((byte)0));
- customHeaders.put("floatHeader", Float.valueOf((float) 42.4242));
+ customHeaders.put("floatHeader", Float.valueOf((float)42.4242));
customHeaders.put("longHeader", Long.valueOf(420000000000000000L));
Mockito.when(properties.getHeaders()).thenReturn(customHeaders);
@@ -139,7 +139,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader"));
assertEquals(new Timestamp(4200), exchange.getIn().getHeader("timestampHeader"));
assertEquals(Byte.valueOf((byte)0), exchange.getIn().getHeader("byteHeader"));
- assertEquals(Float.valueOf((float) 42.4242), exchange.getIn().getHeader("floatHeader"));
+ assertEquals(Float.valueOf((float)42.4242), exchange.getIn().getHeader("floatHeader"));
assertEquals(Long.valueOf(420000000000000000L), exchange.getIn().getHeader("longHeader"));
assertEquals(body, exchange.getIn().getBody());
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index b4677d5..f90e9be 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -197,7 +197,7 @@ public class RabbitMQProducerTest {
assertEquals(42.24, props.getHeaders().get("doubleHeader"));
assertEquals(true, props.getHeaders().get("booleanHeader"));
assertEquals(new Date(0), props.getHeaders().get("dateHeader"));
- assertArrayEquals("foo".getBytes(), (byte[]) props.getHeaders().get("byteArrayHeader"));
+ assertArrayEquals("foo".getBytes(), (byte[])props.getHeaders().get("byteArrayHeader"));
assertNull(props.getHeaders().get("invalidHeader"));
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/AbstractRabbitMQIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/AbstractRabbitMQIntTest.java
index d8de62c..de04164 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/AbstractRabbitMQIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/AbstractRabbitMQIntTest.java
@@ -53,8 +53,8 @@ public abstract class AbstractRabbitMQIntTest extends ContainerAwareTestSupport
}
/**
- * Helper method for creating a RabbitMQ connection to the test instance of the
- * RabbitMQ server.
+ * Helper method for creating a RabbitMQ connection to the test instance of
+ * the RabbitMQ server.
*
* @return
* @throws IOException
@@ -87,7 +87,7 @@ public abstract class AbstractRabbitMQIntTest extends ContainerAwareTestSupport
@Override
protected GenericContainer<?> createContainer() {
container = isStartDocker() ? DockerTestUtils.rabbitMQContainer() : null;
- return (GenericContainer<?>) container;
+ return (GenericContainer<?>)container;
}
@Override
@@ -102,4 +102,4 @@ public abstract class AbstractRabbitMQIntTest extends ContainerAwareTestSupport
protected long containerShutdownTimeout() {
return TimeUnit.MINUTES.toSeconds(1L);
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
index 2876d70..99586c8 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
@@ -31,17 +31,13 @@ public class DockerTestUtils {
public static GenericContainer rabbitMQContainer() {
// docker run -it -p 5672:5672
- // -e RABBITMQ_DEFAULT_USER=cameltest
- // -e RABBITMQ_DEFAULT_PASS=cameltest
- // --hostname my-rabbit
- // --name some-rabbit rabbitmq:3
- GenericContainer container = new GenericContainer<>(CONTAINER_IMAGE)
- .withNetworkAliases(CONTAINER_NAME)
- .withExposedPorts(EXPOSE_PORT)
- .withEnv("RABBITMQ_DEFAULT_USER", "cameltest")
- .withEnv("RABBITMQ_DEFAULT_PASS", "cameltest")
- .withCreateContainerCmdModifier(cmd -> cmd.withHostName("my-rabbit"))
- .waitingFor(Wait.forLogMessage(".*Server startup complete.*\n", 1));
+ // -e RABBITMQ_DEFAULT_USER=cameltest
+ // -e RABBITMQ_DEFAULT_PASS=cameltest
+ // --hostname my-rabbit
+ // --name some-rabbit rabbitmq:3
+ GenericContainer container = new GenericContainer<>(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(EXPOSE_PORT)
+ .withEnv("RABBITMQ_DEFAULT_USER", "cameltest").withEnv("RABBITMQ_DEFAULT_PASS", "cameltest").withCreateContainerCmdModifier(cmd -> cmd.withHostName("my-rabbit"))
+ .waitingFor(Wait.forLogMessage(".*Server startup complete.*\n", 1));
container.setPortBindings(Arrays.asList(String.format("%d:%d", EXPOSE_PORT, EXPOSE_PORT)));
return container;
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQBasicIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQBasicIntTest.java
index b2052ed..949a8bb 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQBasicIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQBasicIntTest.java
@@ -40,14 +40,9 @@ public class RabbitMQBasicIntTest extends AbstractRabbitMQIntTest {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from(foo)
- .log("FOO received: ${body}")
- .to(bar);
-
- from(bar)
- .log("BAR received: ${body}")
- .to(mock)
- .transform().simple("Bye ${body}");
+ from(foo).log("FOO received: ${body}").to(bar);
+
+ from(bar).log("BAR received: ${body}").to(mock).transform().simple("Bye ${body}");
}
};
}
@@ -90,4 +85,3 @@ public class RabbitMQBasicIntTest extends AbstractRabbitMQIntTest {
}
}
-
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
index 73e3cbe..14957ac 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
@@ -54,7 +54,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
@EndpointInject("rabbitmq:localhost:5672/" + "ex7" + "?username=cameltest&password=cameltest&exchangeType=headers&autoDelete=false&durable=true&queue=q7&arg.binding.fizz=buzz")
private Endpoint headersExchangeWithQueueDefiniedInline;
-
+
@BindToRegistry("args")
private Map<String, Object> bindingArgs = new HashMap<String, Object>() {
{
@@ -112,7 +112,6 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
to.expectedMessageCount(1);
to.expectedHeaderReceived(RabbitMQConstants.TIMESTAMP, timestamp);
-
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.timestamp(timestamp);
@@ -123,8 +122,9 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
}
/**
- * Tests the proper rabbit binding arguments are in place when the headersExchangeWithQueue is created.
- * Should only receive messages with the header [foo=bar]
+ * Tests the proper rabbit binding arguments are in place when the
+ * headersExchangeWithQueue is created. Should only receive messages with
+ * the header [foo=bar]
*/
@Test
public void sentMessageIsReceivedWithHeadersRouting() throws InterruptedException, IOException, TimeoutException {
@@ -138,10 +138,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
channel.basicPublish(HEADERS_EXCHANGE, "", propertiesWithHeader("foo", "bra"), MSG.getBytes());
// Only one message should be received, waiting for some other messages
- Awaitility.await()
- .during(1000, TimeUnit.MILLISECONDS)
- .atMost(2000, TimeUnit.MILLISECONDS)
- .until(() -> to.getReceivedCounter() >= 1);
+ Awaitility.await().during(1000, TimeUnit.MILLISECONDS).atMost(2000, TimeUnit.MILLISECONDS).until(() -> to.getReceivedCounter() >= 1);
to.assertIsSatisfied();
}
@@ -171,4 +168,3 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
return calendar.getTime();
}
}
-
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTestReplyTo.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTestReplyTo.java
index 8c7b522..47602f8 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTestReplyTo.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTestReplyTo.java
@@ -37,25 +37,25 @@ import org.junit.Test;
*/
public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest {
protected static final String QUEUE = "amq.rabbitmq.reply-to";
-
+
private static final String EXCHANGE = "ex_reply";
private static final String ROUTING_KEY = "testreply";
private static final String REQUEST = "Knock! Knock!";
private static final String REPLY = "Hello world";
protected Channel channel;
-
+
@EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&routingKey=" + ROUTING_KEY)
private Endpoint from;
private Connection connection;
-
+
@Before
public void setUpRabbitMQ() throws Exception {
connection = connection();
channel = connection.createChannel();
}
-
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
context().setTracing(true);
@@ -64,10 +64,8 @@ public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest {
@Override
public void configure() throws Exception {
log.info("Building routes...");
-
- from(from)
- .log(body().toString())
- .setBody(simple(REPLY));
+
+ from(from).log(body().toString()).setBody(simple(REPLY));
}
};
}
@@ -75,16 +73,16 @@ public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest {
@Test
public void replyMessageIsReceived() throws IOException, TimeoutException, InterruptedException {
final List<String> received = new ArrayList<>();
-
+
AMQP.BasicProperties.Builder prop = new AMQP.BasicProperties.Builder();
prop.replyTo(QUEUE);
-
+
channel.basicConsume(QUEUE, true, new ArrayPopulatingConsumer(received));
channel.basicPublish(EXCHANGE, ROUTING_KEY, prop.build(), REQUEST.getBytes());
-
+
assertThatBodiesReceivedIn(received, REPLY);
}
-
+
private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException {
Thread.sleep(500);
@@ -93,7 +91,7 @@ public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest {
assertEquals(body, received.get(0));
}
}
-
+
private class ArrayPopulatingConsumer extends DefaultConsumer {
private final List<String> received;
@@ -103,10 +101,7 @@ public class RabbitMQConsumerIntTestReplyTo extends AbstractRabbitMQIntTest {
}
@Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
received.add(new String(body));
}
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQDeadLetterRoutingKeyIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQDeadLetterRoutingKeyIntTest.java
index aa3065d..2639fd4 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQDeadLetterRoutingKeyIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQDeadLetterRoutingKeyIntTest.java
@@ -43,28 +43,14 @@ public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDeadLetterRoutingKeyIntTest.class);
- private static final String CONSUMER = "rabbitmq:ex9?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
- + "&skipExchangeDeclare=false"
- + "&skipQueueDeclare=false"
- + "&autoDelete=false"
- + "&durable=true"
- + "&autoAck=false"
- + "&queue=q9"
- + "&routingKey=rk1"
- + "&deadLetterExchange=dlx"
- + "&deadLetterQueue=dlq"
- + "&deadLetterExchangeType=fanout";
-
+ private static final String CONSUMER = "rabbitmq:ex9?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest" + "&skipExchangeDeclare=false"
+ + "&skipQueueDeclare=false" + "&autoDelete=false" + "&durable=true" + "&autoAck=false" + "&queue=q9" + "&routingKey=rk1"
+ + "&deadLetterExchange=dlx" + "&deadLetterQueue=dlq" + "&deadLetterExchangeType=fanout";
+
private static final String CONSUMER_WITH_DEADLETTER_ROUTING_KEY = "rabbitmq:ex10?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
- + "&skipExchangeDeclare=false"
- + "&skipQueueDeclare=false"
- + "&autoDelete=false&durable=true"
- + "&autoAck=false&queue=q10"
- + "&routingKey=rk1"
- + "&deadLetterExchange=dlx"
- + "&deadLetterQueue=dlq"
- + "&deadLetterExchangeType=fanout"
- + "&deadLetterRoutingKey=rk2";
+ + "&skipExchangeDeclare=false" + "&skipQueueDeclare=false" + "&autoDelete=false&durable=true"
+ + "&autoAck=false&queue=q10" + "&routingKey=rk1" + "&deadLetterExchange=dlx" + "&deadLetterQueue=dlq"
+ + "&deadLetterExchangeType=fanout" + "&deadLetterRoutingKey=rk2";
private Connection connection;
private Channel channel;
@@ -107,12 +93,10 @@ public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest
final List<String> received = new ArrayList<>();
final StringBuilder routingKey = new StringBuilder();
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .contentType("text/plain")
- .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType("text/plain").contentEncoding(StandardCharsets.UTF_8.toString()).build();
- receivedEndpoint.whenAnyExchangeReceived(exchange -> {
- throw new Exception("Simulated exception");
+ receivedEndpoint.whenAnyExchangeReceived(exchange -> {
+ throw new Exception("Simulated exception");
});
channel.basicPublish("ex9", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
@@ -130,12 +114,10 @@ public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest
final List<String> received = new ArrayList<>();
StringBuilder routingKey = new StringBuilder();
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .contentType("text/plain")
- .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType("text/plain").contentEncoding(StandardCharsets.UTF_8.toString()).build();
- receivedEndpoint.whenAnyExchangeReceived(exchange -> {
- throw new Exception("Simulated exception");
+ receivedEndpoint.whenAnyExchangeReceived(exchange -> {
+ throw new Exception("Simulated exception");
});
channel.basicPublish("ex10", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQInOutIntTest.java
index 41db517..dbd7c60 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQInOutIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQInOutIntTest.java
@@ -60,15 +60,12 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
@Produce("direct:rabbitMQ")
protected ProducerTemplate directProducer;
- @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest"
- + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY
- + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
+ @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest" + "&autoAck=true&queue=q4&routingKey="
+ + ROUTING_KEY + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
private Endpoint rabbitMQEndpoint;
@EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE_NO_ACK + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest"
- + "&autoAck=false&autoDelete=false&durable=false&queue=q5&routingKey=" + ROUTING_KEY
- + "&transferException=true&requestTimeout=" + TIMEOUT_MS
- + "&args=#args")
+ + "&autoAck=false&autoDelete=false&durable=false&queue=q5&routingKey=" + ROUTING_KEY + "&transferException=true&requestTimeout=" + TIMEOUT_MS + "&args=#args")
private Endpoint noAutoAckEndpoint;
@EndpointInject("mock:result")
@@ -125,9 +122,7 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
from("direct:rabbitMQNoAutoAck").id("producingRouteNoAutoAck").setHeader("routeHeader", simple("routeHeader")).inOut(noAutoAckEndpoint);
- from(noAutoAckEndpoint).id("consumingRouteNoAutoAck")
- .to(resultEndpoint)
- .throwException(new IllegalStateException("test exception"));
+ from(noAutoAckEndpoint).id("consumingRouteNoAutoAck").to(resultEndpoint).throwException(new IllegalStateException("test exception"));
}
};
}
@@ -154,11 +149,14 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
headers.put("String", "String");
headers.put("Boolean", Boolean.valueOf(false));
- // This will blow up the connection if not removed before sending the message
+ // This will blow up the connection if not removed before sending the
+ // message
headers.put("TestObject1", testObject);
- // This will blow up the connection if not removed before sending the message
+ // This will blow up the connection if not removed before sending the
+ // message
headers.put("class", testObject.getClass());
- // This will mess up de-serialization if not removed before sending the message
+ // This will mess up de-serialization if not removed before sending the
+ // message
headers.put("CamelSerialize", true);
// populate a map and an arrayList
@@ -169,9 +167,11 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
tmpList.add(name);
tmpMap.put(name, name);
}
- // This will blow up the connection if not removed before sending the message
+ // This will blow up the connection if not removed before sending the
+ // message
headers.put("arrayList", tmpList);
- // This will blow up the connection if not removed before sending the message
+ // This will blow up the connection if not removed before sending the
+ // message
headers.put("map", tmpMap);
String reply = template.requestBodyAndHeaders("direct:rabbitMQ", "header", headers, String.class);
@@ -216,7 +216,7 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
TestSerializableObject newFoo = null;
try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b);) {
- newFoo = (TestSerializableObject) o.readObject();
+ newFoo = (TestSerializableObject)o.readObject();
} catch (IOException | ClassNotFoundException e) {
}
assertEquals(foo.getName(), newFoo.getName());
@@ -273,7 +273,8 @@ public class RabbitMQInOutIntTest extends AbstractRabbitMQIntTest {
resultEndpoint.expectedMessageCount(0);
- context.stop(); //On restarting the camel context, if the message was not acknowledged the message would be reprocessed
+ context.stop(); // On restarting the camel context, if the message was
+ // not acknowledged the message would be reprocessed
context.start();
resultEndpoint.assertIsSatisfied();
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQLoadIntTest.java
index 437535f..86d6870 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQLoadIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQLoadIntTest.java
@@ -33,21 +33,20 @@ import org.apache.camel.component.rabbitmq.RabbitMQConstants;
import org.junit.Test;
/**
- * Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
- * consumers
+ * Integration test to check that RabbitMQ Endpoint is able handle heavy load
+ * using multiple producers and consumers
*/
public class RabbitMQLoadIntTest extends AbstractRabbitMQIntTest {
public static final String ROUTING_KEY = "rk4";
private static final int PRODUCER_COUNT = 10;
private static final int CONSUMER_COUNT = 10;
private static final int MESSAGE_COUNT = 100;
-
+
@Produce("direct:rabbitMQ")
protected ProducerTemplate directProducer;
- @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&queue=q4&routingKey=" + ROUTING_KEY + "&threadPoolSize=" + (CONSUMER_COUNT + 5)
- + "&concurrentConsumers=" + CONSUMER_COUNT)
+ @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" + "&queue=q4&routingKey=" + ROUTING_KEY + "&threadPoolSize=" + (CONSUMER_COUNT + 5)
+ + "&concurrentConsumers=" + CONSUMER_COUNT)
private Endpoint rabbitMQEndpoint;
@EndpointInject("mock:producing")
@@ -62,15 +61,8 @@ public class RabbitMQLoadIntTest extends AbstractRabbitMQIntTest {
@Override
public void configure() throws Exception {
- from("direct:rabbitMQ")
- .id("producingRoute")
- .log("Sending message")
- .inOnly(rabbitMQEndpoint)
- .to(producingMockEndpoint);
- from(rabbitMQEndpoint)
- .id("consumingRoute")
- .log("Receiving message")
- .to(consumingMockEndpoint);
+ from("direct:rabbitMQ").id("producingRoute").log("Sending message").inOnly(rabbitMQEndpoint).to(producingMockEndpoint);
+ from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").to(consumingMockEndpoint);
}
};
}
@@ -85,8 +77,7 @@ public class RabbitMQLoadIntTest extends AbstractRabbitMQIntTest {
@Override
public void run() {
for (int i = 0; i < MESSAGE_COUNT; i++) {
- directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY,
- ROUTING_KEY);
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
}
}
}));
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
index cca5f4b..627b547 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
@@ -61,10 +61,10 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
@Produce("direct:start-allow-null-headers")
protected ProducerTemplate templateAllowNullHeaders;
-
+
@Produce("direct:start-not-allow-custom-headers")
protected ProducerTemplate templateNotAllowCustomHeaders;
-
+
@Produce("direct:start-allow-custom-headers")
protected ProducerTemplate templateAllowCustomHeaders;
@@ -160,7 +160,7 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, received, "new message");
}
-
+
@Test
public void producedMessageNotAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
final List<String> received = new ArrayList<>();
@@ -179,7 +179,7 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
assertTrue(receivedHeaders.containsKey(RabbitMQConstants.EXCHANGE_NAME));
assertFalse(receivedHeaders.containsKey(CUSTOM_HEADER));
}
-
+
@Test
public void producedMessageAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
final List<String> received = new ArrayList<>();
@@ -208,8 +208,9 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
}
}
- private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object> receivedHeaders, Map<String, Object> expectedHeaders,
- final List<String> received, final String... expected) throws InterruptedException {
+ private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object> receivedHeaders, Map<String, Object> expectedHeaders, final List<String> received,
+ final String... expected)
+ throws InterruptedException {
Thread.sleep(500);
assertListSize(received, expected.length);
@@ -288,10 +289,7 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
}
@Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
LOGGER.info("AMQP.BasicProperties: {}", properties);
receivedHeaders.putAll(properties.getHeaders());
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQReConnectionIntTest.java
index bb59454..91f4405 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQReConnectionIntTest.java
@@ -31,15 +31,20 @@ import org.apache.camel.component.rabbitmq.RabbitMQConstants;
import org.junit.Test;
/**
- * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
- * is not available.
+ * Integration test to check that RabbitMQ Endpoint is able to reconnect to
+ * broker when broker is not available.
* <ul>
* <li>Stop the broker</li>
- * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
- * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
- * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
- * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
- * <li>Kill all connections from the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Run the test: the producer complains it can not send messages, the
+ * consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives
+ * messages</li>
+ * <li>Stop the broker: the producer complains it can not send messages, the
+ * consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives
+ * messages</li>
+ * <li>Kill all connections from the broker: the producer sends messages, and
+ * the consumer receives messages</li>
* </ul>
*/
public class RabbitMQReConnectionIntTest extends AbstractRabbitMQIntTest {
@@ -48,9 +53,8 @@ public class RabbitMQReConnectionIntTest extends AbstractRabbitMQIntTest {
@Produce("direct:rabbitMQ")
protected ProducerTemplate directProducer;
- @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest"
- + "&queue=q3&routingKey=rk3" + "&automaticRecoveryEnabled=true"
- + "&requestedHeartbeat=1000" + "&connectionTimeout=5000")
+ @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" + "&queue=q3&routingKey=rk3" + "&automaticRecoveryEnabled=true"
+ + "&requestedHeartbeat=1000" + "&connectionTimeout=5000")
private Endpoint rabbitMQEndpoint;
@EndpointInject("mock:producing")
@@ -66,19 +70,9 @@ public class RabbitMQReConnectionIntTest extends AbstractRabbitMQIntTest {
@Override
@SuppressWarnings("unchecked")
public void configure() throws Exception {
- from("direct:rabbitMQ")
- .id("producingRoute")
- .onException(AlreadyClosedException.class, ConnectException.class)
- .maximumRedeliveries(10)
- .redeliveryDelay(500L)
- .end()
- .log("Sending message")
- .inOnly(rabbitMQEndpoint)
- .to(producingMockEndpoint);
- from(rabbitMQEndpoint)
- .id("consumingRoute")
- .log("Receiving message")
- .to(consumingMockEndpoint);
+ from("direct:rabbitMQ").id("producingRoute").onException(AlreadyClosedException.class, ConnectException.class).maximumRedeliveries(10).redeliveryDelay(500L).end()
+ .log("Sending message").inOnly(rabbitMQEndpoint).to(producingMockEndpoint);
+ from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").to(consumingMockEndpoint);
}
};
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueHandledExceptionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueHandledExceptionIntTest.java
index a64de97..d29e090 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueHandledExceptionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueHandledExceptionIntTest.java
@@ -26,7 +26,8 @@ import org.apache.camel.component.rabbitmq.RabbitMQConstants;
import org.junit.Test;
/**
- * Integration test to confirm REQUEUE header causes message not to be re-queued when an handled exception occurs.
+ * Integration test to confirm REQUEUE header causes message not to be re-queued
+ * when an handled exception occurs.
*/
public class RabbitMQRequeueHandledExceptionIntTest extends AbstractRabbitMQIntTest {
public static final String ROUTING_KEY = "rk4";
@@ -34,8 +35,7 @@ public class RabbitMQRequeueHandledExceptionIntTest extends AbstractRabbitMQIntT
@Produce("direct:rabbitMQ")
protected ProducerTemplate directProducer;
- @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+ @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
private Endpoint rabbitMQEndpoint;
@EndpointInject("mock:producing")
@@ -50,20 +50,10 @@ public class RabbitMQRequeueHandledExceptionIntTest extends AbstractRabbitMQIntT
@Override
public void configure() throws Exception {
- from("direct:rabbitMQ")
- .id("producingRoute")
- .log("Sending message")
- .inOnly(rabbitMQEndpoint)
- .to(producingMockEndpoint);
+ from("direct:rabbitMQ").id("producingRoute").log("Sending message").inOnly(rabbitMQEndpoint).to(producingMockEndpoint);
- from(rabbitMQEndpoint)
- .onException(Exception.class)
- .handled(true)
- .end()
- .id("consumingRoute")
- .log("Receiving message")
- .inOnly(consumingMockEndpoint)
- .throwException(new Exception("Simulated handled exception"));
+ from(rabbitMQEndpoint).onException(Exception.class).handled(true).end().id("consumingRoute").log("Receiving message").inOnly(consumingMockEndpoint)
+ .throwException(new Exception("Simulated handled exception"));
}
};
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueIntTest.java
index ad70396..3917840 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueIntTest.java
@@ -30,7 +30,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
/**
- * Integration test to confirm REQUEUE header causes message to be re-queued instead of sent to DLQ.
+ * Integration test to confirm REQUEUE header causes message to be re-queued
+ * instead of sent to DLQ.
*/
public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
public static final String ROUTING_KEY = "rk4";
@@ -40,8 +41,8 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
protected ProducerTemplate directProducer;
@EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&autoAck=false&autoDelete=false&durable=true&queue=q4&deadLetterExchange=dlx&deadLetterExchangeType=fanout"
- + "&deadLetterQueue=" + DEAD_LETTER_QUEUE_NAME + "&routingKey=" + ROUTING_KEY)
+ + "&autoAck=false&autoDelete=false&durable=true&queue=q4&deadLetterExchange=dlx&deadLetterExchangeType=fanout" + "&deadLetterQueue=" + DEAD_LETTER_QUEUE_NAME
+ + "&routingKey=" + ROUTING_KEY)
private Endpoint rabbitMQEndpoint;
@EndpointInject("mock:producing")
@@ -71,24 +72,15 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
connection.abort();
}
-
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:rabbitMQ")
- .id("producingRoute")
- .log("Sending message")
- .inOnly(rabbitMQEndpoint)
- .to(producingMockEndpoint);
-
- from(rabbitMQEndpoint)
- .id("consumingRoute")
- .log("Receiving message")
- .inOnly(consumingMockEndpoint)
- .throwException(new Exception("Simulated exception"));
+ from("direct:rabbitMQ").id("producingRoute").log("Sending message").inOnly(rabbitMQEndpoint).to(producingMockEndpoint);
+
+ from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").inOnly(consumingMockEndpoint).throwException(new Exception("Simulated exception"));
}
};
}
@@ -103,7 +95,8 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
directProducer.sendBody("Hello, World!");
deadLetterChannel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, new DeadLetterConsumer(received));
- // If message was rejected and not requeued, it will be published in dead letter queue
+ // If message was rejected and not requeued, it will be published in
+ // dead letter queue
await().atMost(5, SECONDS).until(() -> received.size() == 1);
producingMockEndpoint.assertIsSatisfied();
@@ -120,7 +113,8 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
directProducer.sendBodyAndHeader("Hello, World!", RabbitMQConstants.REQUEUE, 4L);
deadLetterChannel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, new DeadLetterConsumer(received));
- // If message was rejected and not requeued, it will be published in dead letter queue
+ // If message was rejected and not requeued, it will be published in
+ // dead letter queue
await().atMost(5, SECONDS).until(() -> received.size() == 1);
producingMockEndpoint.assertIsSatisfied();
@@ -137,7 +131,8 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
directProducer.sendBodyAndHeader("Hello, World!", RabbitMQConstants.REQUEUE, false);
deadLetterChannel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, new DeadLetterConsumer(received));
- // If message was rejected and not requeued, it will be published in dead letter queue
+ // If message was rejected and not requeued, it will be published in
+ // dead letter queue
await().atMost(5, SECONDS).until(() -> received.size() == 1);
producingMockEndpoint.assertIsSatisfied();
@@ -154,12 +149,10 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
directProducer.sendBodyAndHeader("Hello, World!", RabbitMQConstants.REQUEUE, true);
deadLetterChannel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, new DeadLetterConsumer(received));
- Awaitility.await()
- .during(1, SECONDS)
- .atMost(2, SECONDS)
- .until(() -> received.size() >= 0);
+ Awaitility.await().during(1, SECONDS).atMost(2, SECONDS).until(() -> received.size() >= 0);
- // If message was rejected and requeued it will not be published in dead letter queue
+ // If message was rejected and requeued it will not be published in dead
+ // letter queue
assertEquals(0, received.size());
producingMockEndpoint.assertIsSatisfied();
consumingMockEndpoint.assertIsSatisfied();
@@ -174,10 +167,7 @@ public class RabbitMQRequeueIntTest extends AbstractRabbitMQIntTest {
}
@Override
- public void handleDelivery(String consumerTag,
- com.rabbitmq.client.Envelope envelope,
- com.rabbitmq.client.AMQP.BasicProperties properties,
- byte[] body) {
+ public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) {
received.add(new String(body));
}
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueUnhandledExceptionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueUnhandledExceptionIntTest.java
index 667dfd8..6c2644e 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueUnhandledExceptionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQRequeueUnhandledExceptionIntTest.java
@@ -26,7 +26,8 @@ import org.apache.camel.component.rabbitmq.RabbitMQConstants;
import org.junit.Test;
/**
- * Integration test to confirm REQUEUE header causes message to be re-queued when an unhandled exception occurs.
+ * Integration test to confirm REQUEUE header causes message to be re-queued
+ * when an unhandled exception occurs.
*/
public class RabbitMQRequeueUnhandledExceptionIntTest extends AbstractRabbitMQIntTest {
public static final String ROUTING_KEY = "rk4";
@@ -34,8 +35,7 @@ public class RabbitMQRequeueUnhandledExceptionIntTest extends AbstractRabbitMQIn
@Produce("direct:rabbitMQ")
protected ProducerTemplate directProducer;
- @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
+ @EndpointInject("rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest" + "&autoAck=false&queue=q4&routingKey=" + ROUTING_KEY)
private Endpoint rabbitMQEndpoint;
@EndpointInject("mock:producing")
@@ -50,20 +50,10 @@ public class RabbitMQRequeueUnhandledExceptionIntTest extends AbstractRabbitMQIn
@Override
public void configure() throws Exception {
- from("direct:rabbitMQ")
- .id("producingRoute")
- .log("Sending message")
- .inOnly(rabbitMQEndpoint)
- .to(producingMockEndpoint);
+ from("direct:rabbitMQ").id("producingRoute").log("Sending message").inOnly(rabbitMQEndpoint).to(producingMockEndpoint);
- from(rabbitMQEndpoint)
- .onException(Exception.class)
- .handled(false)
- .end()
- .id("consumingRoute")
- .log("Receiving message")
- .inOnly(consumingMockEndpoint)
- .throwException(new Exception("Simulated unhandled exception"));
+ from(rabbitMQEndpoint).onException(Exception.class).handled(false).end().id("consumingRoute").log("Receiving message").inOnly(consumingMockEndpoint)
+ .throwException(new Exception("Simulated unhandled exception"));
}
};
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/spring/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/spring/RabbitMQSpringIntTest.java
index 3116dba..eb650e1 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/spring/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/spring/RabbitMQSpringIntTest.java
@@ -58,7 +58,7 @@ public class RabbitMQSpringIntTest extends AbstractRabbitMQSpringIntTest {
protected AbstractApplicationContext createApplicationContext() {
AbstractApplicationContext applicationContext = super.createApplicationContext();
- connectionFactory = (ConnectionFactory) applicationContext.getBean("customConnectionFactory");
+ connectionFactory = (ConnectionFactory)applicationContext.getBean("customConnectionFactory");
return applicationContext;
}
@@ -109,7 +109,6 @@ public class RabbitMQSpringIntTest extends AbstractRabbitMQSpringIntTest {
}
}
-
private static final class LastDeliveryConsumer extends DefaultConsumer {
private byte[] lastBody;
@@ -147,4 +146,4 @@ public class RabbitMQSpringIntTest extends AbstractRabbitMQSpringIntTest {
}
assertEquals(body, consumer.getLastBodyAsString());
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQConsumerQpidTestReplyTo.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQConsumerQpidTestReplyTo.java
index 19caa71..5ce1187 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQConsumerQpidTestReplyTo.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQConsumerQpidTestReplyTo.java
@@ -36,12 +36,12 @@ public class RabbitMQConsumerQpidTestReplyTo extends RabbitMQConsumerIntTestRepl
public static void stopBroker() {
systemLauncher.shutdown();
}
-
+
@Override
@Before
public void setUpRabbitMQ() throws Exception {
super.setUpRabbitMQ();
-
+
channel.queueDeclare(QUEUE, false, false, true, null);
}
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQProducerQpidTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQProducerQpidTest.java
index 0cd0b2c..5daa149 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQProducerQpidTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQProducerQpidTest.java
@@ -39,12 +39,12 @@ public class RabbitMQProducerQpidTest extends RabbitMQProducerIntTest {
public static void stopBroker() {
systemLauncher.shutdown();
}
-
+
@Ignore
@Override
public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabledAndBadRoutingKeyIsUsed() throws InterruptedException, IOException, TimeoutException {
}
-
+
@Ignore
@Override
public void shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedOnABadRouteButMessageIsNotMandatory() throws InterruptedException, IOException, TimeoutException {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQRequeueQpidTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQRequeueQpidTest.java
index 199a088..1451971 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQRequeueQpidTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/qpid/RabbitMQRequeueQpidTest.java
@@ -37,13 +37,13 @@ public class RabbitMQRequeueQpidTest extends RabbitMQRequeueIntTest {
public static void stopBroker() {
systemLauncher.shutdown();
}
-
+
@Ignore
@Override
@Test
public void testNoRequeueHeaderCausesReject() throws Exception {
}
-
+
@Ignore
@Override
@Test
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
index 266579c..87d9660 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
@@ -36,4 +36,4 @@ public class TestNonSerializableObject {
public void setName(String name) {
this.name = name;
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
index e1fbc09..84940c1 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
@@ -49,4 +49,4 @@ public class TestPartiallySerializableObject implements Serializable {
this.nonSerializableObject = nonSerializableObject;
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
index 11137bf..193baf2 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
@@ -39,4 +39,4 @@ public class TestSerializableObject implements Serializable {
public void setName(String name) {
this.name = name;
}
-}
\ No newline at end of file
+}