You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 09:42:29 UTC
[1/2] camel git commit: RabbitMQEndpoint refactoring,
extracted a few helper classes to avoid RabbitMQEndpoint becoming a
god class
Repository: camel
Updated Branches:
refs/heads/master bd226cbe8 -> c34db42c8
RabbitMQEndpoint refactoring, extracted a few helper classes to avoid RabbitMQEndpoint becoming a god class
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48a28a2c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48a28a2c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48a28a2c
Branch: refs/heads/master
Commit: 48a28a2c301c684e5481d27975cb1572c7201df6
Parents: bd226cb
Author: Miloš Milivojević <mm...@deployinc.com>
Authored: Thu Dec 24 11:01:24 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 09:40:00 2015 +0100
----------------------------------------------------------------------
.../RabbitMQConnectionFactorySupport.java | 64 +++++
.../component/rabbitmq/RabbitMQConsumer.java | 2 +-
.../rabbitmq/RabbitMQDeclareSupport.java | 103 ++++++++
.../component/rabbitmq/RabbitMQEndpoint.java | 245 +------------------
.../rabbitmq/RabbitMQMessageConverter.java | 115 +++++++--
.../rabbitmq/RabbitMQMessagePublisher.java | 123 ++++++++++
.../rabbitmq/reply/ReplyManagerSupport.java | 22 +-
.../rabbitmq/RabbitMQProducerIntTest.java | 1 -
8 files changed, 411 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
new file mode 100644
index 0000000..de6cc24
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RabbitMQConnectionFactorySupport {
+
+ public ConnectionFactory createFactoryFor(final RabbitMQEndpoint endpoint) {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setUsername(endpoint.getUsername());
+ factory.setPassword(endpoint.getPassword());
+ factory.setVirtualHost(endpoint.getVhost());
+ factory.setHost(endpoint.getHostname());
+ factory.setPort(endpoint.getPortNumber());
+ if (endpoint.getClientProperties() != null) {
+ factory.setClientProperties(endpoint.getClientProperties());
+ }
+ factory.setConnectionTimeout(endpoint.getConnectionTimeout());
+ factory.setRequestedChannelMax(endpoint.getRequestedChannelMax());
+ factory.setRequestedFrameMax(endpoint.getRequestedFrameMax());
+ factory.setRequestedHeartbeat(endpoint.getRequestedHeartbeat());
+ if (endpoint.getSslProtocol() != null) {
+ try {
+ if (endpoint.getSslProtocol().equals("true")) {
+ factory.useSslProtocol();
+ } else if (endpoint.getTrustManager() == null) {
+ factory.useSslProtocol(endpoint.getSslProtocol());
+ } else {
+ factory.useSslProtocol(endpoint.getSslProtocol(), endpoint.getTrustManager());
+ }
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new IllegalArgumentException("Invalid sslProtocol " + endpoint.getSslProtocol(), e);
+ }
+ }
+ if (endpoint.getAutomaticRecoveryEnabled() != null) {
+ factory.setAutomaticRecoveryEnabled(endpoint.getAutomaticRecoveryEnabled());
+ }
+ if (endpoint.getNetworkRecoveryInterval() != null) {
+ factory.setNetworkRecoveryInterval(endpoint.getNetworkRecoveryInterval());
+ }
+ if (endpoint.getTopologyRecoveryEnabled() != null) {
+ factory.setTopologyRecoveryEnabled(endpoint.getTopologyRecoveryEnabled());
+ }
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index cdb23f4..a71769e 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -94,7 +94,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
// First channel used to declare Exchange and Queue
Channel channel = openChannel();
if (getEndpoint().isDeclare()) {
- endpoint.declareExchangeAndQueue(channel);
+ getEndpoint().declareExchangeAndQueue(channel);
}
startConsumer(channel);
// Other channels
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
new file mode 100644
index 0000000..aa4df2f
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.rabbitmq.client.Channel;
+
+public class RabbitMQDeclareSupport {
+
+ private final RabbitMQEndpoint endpoint;
+
+ RabbitMQDeclareSupport(final RabbitMQEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void declareAndBindExchangesAndQueuesUsing(final Channel channel) throws IOException {
+ declareAndBindDeadLetterExchangeWithQueue(channel);
+ declareAndBindExchangeWithQueue(channel);
+ }
+
+ private void declareAndBindDeadLetterExchangeWithQueue(final Channel channel) throws IOException {
+ if (endpoint.getDeadLetterExchange() != null) {
+ // TODO Do we need to setup the args for the DeadLetter?
+ declareExchange(channel, endpoint.getDeadLetterExchange(), endpoint.getDeadLetterExchangeType(), Collections.<String, Object>emptyMap());
+ declareAndBindQueue(channel, endpoint.getDeadLetterQueue(), endpoint.getDeadLetterExchange(), endpoint.getDeadLetterRoutingKey(), null);
+ }
+ }
+
+ private void declareAndBindExchangeWithQueue(final Channel channel) throws IOException {
+ declareExchange(channel, endpoint.getExchangeName(), endpoint.getExchangeType(), resolvedExchangeArguments());
+
+ if (shouldDeclareQueue()) {
+ // need to make sure the queueDeclare is same with the exchange declare
+ declareAndBindQueue(channel, endpoint.getQueue(), endpoint.getExchangeName(), endpoint.getRoutingKey(), resolvedQueueArguments());
+ }
+ }
+
+ private Map<String, Object> resolvedQueueArguments() {
+ Map<String, Object> queueArgs = new HashMap<>();
+ populateQueueArgumentsFromDeadLetterExchange(queueArgs);
+ populateQueueArgumentsFromConfigurer(queueArgs);
+ return queueArgs;
+ }
+
+ private Map<String, Object> populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) {
+ if (endpoint.getDeadLetterExchange() != null) {
+ queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange());
+ queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+ }
+
+ return queueArgs;
+ }
+
+ private Map<String, Object> resolvedExchangeArguments() {
+ Map<String, Object> exchangeArgs = new HashMap<>();
+ if (endpoint.getExchangeArgsConfigurer() != null) {
+ endpoint.getExchangeArgsConfigurer().configurArgs(exchangeArgs);
+ }
+ return exchangeArgs;
+ }
+
+ private boolean shouldDeclareQueue() {
+ return !endpoint.isSkipQueueDeclare() && endpoint.getQueue() != null;
+ }
+
+ private void populateQueueArgumentsFromConfigurer(final Map<String, Object> queueArgs) {
+ if (endpoint.getQueueArgsConfigurer() != null) {
+ endpoint.getQueueArgsConfigurer().configurArgs(queueArgs);
+ }
+ }
+
+ private void declareExchange(final Channel channel, final String exchange, final String exchangeType, final Map<String, Object> exchangeArgs) throws IOException {
+ channel.exchangeDeclare(exchange, exchangeType, endpoint.isDurable(), endpoint.isAutoDelete(), exchangeArgs);
+ }
+
+ private void declareAndBindQueue(final Channel channel, final String queue, final String exchange, final String routingKey, final Map<String, Object> arguments)
+ throws IOException {
+ channel.queueDeclare(queue, endpoint.isDurable(), false, endpoint.isAutoDelete(), arguments);
+ channel.queueBind(queue, exchange, emptyIfNull(routingKey));
+ }
+
+ private String emptyIfNull(final String routingKey) {
+ return routingKey == null ? "" : routingKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7a6e48c..41eab3f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -16,18 +16,8 @@
*/
package org.apache.camel.component.rabbitmq;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.NotSerializableException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -41,17 +31,12 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.LongString;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConversionException;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
@@ -59,14 +44,11 @@ import org.apache.camel.spi.UriPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * The rabbitmq component allows AMQP messages to be sent to (or consumed from) a RabbitMQ broker.
- */
@UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
public class RabbitMQEndpoint extends DefaultEndpoint {
- private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
// header to indicate that the message body needs to be de-serialized
- private static final String SERIALIZE_HEADER = "CamelSerialize";
+ public static final String SERIALIZE_HEADER = "CamelSerialize";
+ private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
@UriPath @Metadata(required = "true")
private String hostname;
@@ -172,7 +154,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private String replyTo;
private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
-
+ private final RabbitMQConnectionFactorySupport factoryCreator = new RabbitMQConnectionFactorySupport();
+ private final RabbitMQDeclareSupport declareSupport = new RabbitMQDeclareSupport(this);
public RabbitMQEndpoint() {
}
@@ -188,7 +171,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
Exchange exchange = super.createExchange();
- setRabbitExchange(exchange, envelope, properties, body, false);
+ messageConverter.populateRabbitExchange(exchange, envelope, properties, body, false);
return exchange;
}
@@ -199,132 +182,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
return messageConverter;
}
- public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, boolean out) {
- Message message;
- if (out) {
- // use OUT message
- message = camelExchange.getOut();
- } else {
- if (camelExchange.getIn() != null) {
- // Use the existing message so we keep the headers
- message = camelExchange.getIn();
- } else {
- message = new DefaultMessage();
- camelExchange.setIn(message);
- }
- }
-
- if (envelope != null) {
- message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
- message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
- message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
- }
-
- Map<String, Object> headers = properties.getHeaders();
- if (headers != null) {
- for (Map.Entry<String, Object> entry : headers.entrySet()) {
- // Convert LongStrings to String.
- if (entry.getValue() instanceof LongString) {
- message.setHeader(entry.getKey(), entry.getValue().toString());
- } else {
- message.setHeader(entry.getKey(), entry.getValue());
- }
- }
- }
-
- if (hasSerializeHeader(properties)) {
- Object messageBody = null;
- try (InputStream b = new ByteArrayInputStream(body);
- ObjectInputStream o = new ObjectInputStream(b);) {
- messageBody = o.readObject();
- } catch (IOException | ClassNotFoundException e) {
- LOG.warn("Could not deserialize the object");
- }
- if (messageBody instanceof Throwable) {
- LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
- camelExchange.setException((Throwable) messageBody);
- } else {
- message.setBody(messageBody);
- }
- } else {
- // Set the body as a byte[] and let the type converter deal with it
- message.setBody(body);
- }
-
- }
-
- private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
- if (properties == null || properties.getHeaders() == null) {
- return false;
- }
- if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) {
- return true;
- }
- return false;
- }
-
/**
* Sends the body that is on the exchange
*/
public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException {
- Message msg;
- if (camelExchange.hasOut()) {
- msg = camelExchange.getOut();
- } else {
- msg = camelExchange.getIn();
- }
-
- // Remove the SERIALIZE_HEADER in case it was previously set
- if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) {
- LOG.debug("Removing the {} header", SERIALIZE_HEADER);
- msg.getHeaders().remove(SERIALIZE_HEADER);
- }
-
- AMQP.BasicProperties properties;
- byte[] body;
- try {
- // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
- body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody());
-
- properties = getMessageConverter().buildProperties(camelExchange).build();
- } catch (NoTypeConversionAvailableException | TypeConversionException e) {
- if (msg.getBody() instanceof Serializable) {
- // Add the header so the reply processor knows to de-serialize it
- msg.getHeaders().put(SERIALIZE_HEADER, true);
-
- properties = getMessageConverter().buildProperties(camelExchange).build();
-
- try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) {
- o.writeObject(msg.getBody());
- body = b.toByteArray();
- } catch (NotSerializableException nse) {
- LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects.");
- throw new RuntimeCamelException(e);
- }
- } else if (msg.getBody() == null) {
- properties = getMessageConverter().buildProperties(camelExchange).build();
- body = null;
- } else {
- LOG.warn("Could not convert {} to byte[]", msg.getBody());
- throw new RuntimeCamelException(e);
- }
- }
- String rabbitExchange = getExchangeName(msg);
-
- Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class);
- Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class);
-
- LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
-
- if (isPublisherAcknowledgements()) {
- channel.confirmSelect();
- }
-
- channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
-
- if (isPublisherAcknowledgements()) {
- waitForConfirmationFor(channel, camelExchange);
- }
+ new RabbitMQMessagePublisher(camelExchange, channel, routingKey, this).publish();
}
/**
@@ -337,16 +199,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
exchangeName = getExchangeName();
}
return exchangeName;
- }
-
- private void waitForConfirmationFor(final Channel channel, final Exchange camelExchange) throws IOException {
- try {
- LOG.debug("Waiting for publisher acknowledgements for {}ms", getPublisherAcknowledgementsTimeout());
- channel.waitForConfirmsOrDie(getPublisherAcknowledgementsTimeout());
- } catch (InterruptedException | TimeoutException e) {
- LOG.warn("Acknowledgement error for {}", camelExchange);
- throw new RuntimeCamelException(e);
- }
}
@Override
@@ -368,88 +220,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
* If needed, declare Exchange, declare Queue and bind them with Routing Key
*/
public void declareExchangeAndQueue(Channel channel) throws IOException {
- Map<String, Object> queueArgs = new HashMap<String, Object>();
- Map<String, Object> exchangeArgs = new HashMap<String, Object>();
-
- if (deadLetterExchange != null) {
- queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, getDeadLetterExchange());
- queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, getDeadLetterRoutingKey());
- // TODO Do we need to setup the args for the DeadLetter?
- channel.exchangeDeclare(getDeadLetterExchange(),
- getDeadLetterExchangeType(),
- isDurable(),
- isAutoDelete(),
- new HashMap<String, Object>());
- channel.queueDeclare(getDeadLetterQueue(), isDurable(), false,
- isAutoDelete(), null);
- channel.queueBind(
- getDeadLetterQueue(),
- getDeadLetterExchange(),
- getDeadLetterRoutingKey() == null ? "" : getDeadLetterRoutingKey());
- }
-
- if (getQueueArgsConfigurer() != null) {
- getQueueArgsConfigurer().configurArgs(queueArgs);
- }
- if (getExchangeArgsConfigurer() != null) {
- getExchangeArgsConfigurer().configurArgs(exchangeArgs);
- }
-
- channel.exchangeDeclare(getExchangeName(),
- getExchangeType(),
- isDurable(),
- isAutoDelete(), exchangeArgs);
- if (!isSkipQueueDeclare() && getQueue() != null) {
- // need to make sure the queueDeclare is same with the exchange declare
- channel.queueDeclare(getQueue(), isDurable(), false,
- isAutoDelete(), queueArgs);
- channel.queueBind(
- getQueue(),
- getExchangeName(),
- getRoutingKey() == null ? "" : getRoutingKey());
- }
+ declareSupport.declareAndBindExchangesAndQueuesUsing(channel);
}
private ConnectionFactory getOrCreateConnectionFactory() {
if (connectionFactory == null) {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername(getUsername());
- factory.setPassword(getPassword());
- factory.setVirtualHost(getVhost());
- factory.setHost(getHostname());
- factory.setPort(getPortNumber());
- if (getClientProperties() != null) {
- factory.setClientProperties(getClientProperties());
- }
- factory.setConnectionTimeout(getConnectionTimeout());
- factory.setRequestedChannelMax(getRequestedChannelMax());
- factory.setRequestedFrameMax(getRequestedFrameMax());
- factory.setRequestedHeartbeat(getRequestedHeartbeat());
- if (getSslProtocol() != null) {
- try {
- if (getSslProtocol().equals("true")) {
- factory.useSslProtocol();
- } else if (getTrustManager() == null) {
- factory.useSslProtocol(getSslProtocol());
- } else {
- factory.useSslProtocol(getSslProtocol(), getTrustManager());
- }
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
- } catch (KeyManagementException e) {
- throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
- }
- }
- if (getAutomaticRecoveryEnabled() != null) {
- factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled());
- }
- if (getNetworkRecoveryInterval() != null) {
- factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
- }
- if (getTopologyRecoveryEnabled() != null) {
- factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled());
- }
- connectionFactory = factory;
+ connectionFactory = factoryCreator.createFactoryFor(this);
}
return connectionFactory;
}
@@ -847,7 +623,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public void setDeclare(boolean declare) {
this.declare = declare;
}
-
+
public String getDeadLetterExchange() {
return deadLetterExchange;
}
@@ -958,7 +734,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public ArgsConfigurer getExchangeArgsConfigurer() {
return exchangeArgsConfigurer;
}
-
+
/**
* Set the configurer for setting the exchange args in Channel.exchangeDeclare
*/
@@ -1041,4 +817,5 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public String getReplyTo() {
return replyTo;
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 66299f6..1873674 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -16,17 +16,20 @@
*/
package org.apache.camel.component.rabbitmq;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.LongString;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,7 +156,7 @@ public class RabbitMQMessageConverter {
}
final Map<String, Object> headers = msg.getHeaders();
- Map<String, Object> filteredHeaders = new HashMap<String, Object>();
+ Map<String, Object> filteredHeaders = new HashMap<>();
// TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
for (Map.Entry<String, Object> header : headers.entrySet()) {
@@ -166,9 +169,7 @@ public class RabbitMQMessageConverter {
LOG.debug("Ignoring header: {} with null value", header.getKey());
} else {
LOG.debug("Ignoring header: {} of class: {} with value: {}",
- new Object[] {
- header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()
- });
+ header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue());
}
}
}
@@ -191,8 +192,6 @@ public class RabbitMQMessageConverter {
private Object getValidRabbitMQHeaderValue(Object headerValue) {
if (headerValue instanceof String) {
return headerValue;
- } else if (headerValue instanceof BigDecimal) {
- return headerValue;
} else if (headerValue instanceof Number) {
return headerValue;
} else if (headerValue instanceof Boolean) {
@@ -203,18 +202,96 @@ public class RabbitMQMessageConverter {
return headerValue;
} else if (headerValue instanceof LongString) {
return headerValue;
- } else if (headerValue instanceof Timestamp) {
- return headerValue;
- } else if (headerValue instanceof Byte) {
- return headerValue;
- } else if (headerValue instanceof Double) {
- return headerValue;
- } else if (headerValue instanceof Float) {
- return headerValue;
- } else if (headerValue instanceof Long) {
- return headerValue;
}
return null;
}
+
+ public void populateRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, final boolean out) {
+ Message message = resolveMessageFrom(camelExchange, out);
+ populateMessageHeaders(message, envelope, properties);
+ populateMessageBody(message, camelExchange, properties, body);
+ }
+
+ private Message resolveMessageFrom(final Exchange camelExchange, final boolean out) {
+ Message message;
+ if (out) {
+ // use OUT message
+ message = camelExchange.getOut();
+ } else {
+ if (camelExchange.getIn() != null) {
+ // Use the existing message so we keep the headers
+ message = camelExchange.getIn();
+ } else {
+ message = new DefaultMessage();
+ camelExchange.setIn(message);
+ }
+ }
+ return message;
+ }
+
+ private void populateMessageHeaders(final Message message, final Envelope envelope, final AMQP.BasicProperties properties) {
+ populateRoutingInfoHeaders(message, envelope);
+ populateMessageHeadersFromRabbitMQHeaders(message, properties);
+ }
+
+ private void populateRoutingInfoHeaders(final Message message, final Envelope envelope) {
+ if (envelope != null) {
+ message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
+ message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
+ message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+ }
+ }
+
+ private void populateMessageHeadersFromRabbitMQHeaders(final Message message, final AMQP.BasicProperties properties) {
+ Map<String, Object> headers = properties.getHeaders();
+ if (headers != null) {
+ for (Map.Entry<String, Object> entry : headers.entrySet()) {
+ // Convert LongStrings to String.
+ if (entry.getValue() instanceof LongString) {
+ message.setHeader(entry.getKey(), entry.getValue().toString());
+ } else {
+ message.setHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ private void populateMessageBody(final Message message, final Exchange camelExchange, final AMQP.BasicProperties properties, final byte[] body) {
+ if (hasSerializeHeader(properties)) {
+ deserializeBody(camelExchange, message, body);
+ } else {
+ // Set the body as a byte[] and let the type converter deal with it
+ message.setBody(body);
+ }
+ }
+
+ private void deserializeBody(final Exchange camelExchange, final Message message, final byte[] body) {
+ Object messageBody = null;
+ try (InputStream b = new ByteArrayInputStream(body);
+ ObjectInputStream o = new ObjectInputStream(b)) {
+ messageBody = o.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ LOG.warn("Could not deserialize the object");
+ camelExchange.setException(e);
+ }
+ if (messageBody instanceof Throwable) {
+ LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
+ camelExchange.setException((Throwable) messageBody);
+ } else {
+ message.setBody(messageBody);
+ }
+ }
+
+ private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
+ return hasHeaders(properties) && Boolean.TRUE.equals(isSerializeHeaderEnabled(properties));
+ }
+
+ private boolean hasHeaders(final AMQP.BasicProperties properties) {
+ return properties != null && properties.getHeaders() != null;
+ }
+
+ private Object isSerializeHeaderEnabled(final AMQP.BasicProperties properties) {
+ return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
new file mode 100644
index 0000000..6f50ec4
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.*;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import org.apache.camel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A method object for publishing to RabbitMQ
+ */
+public class RabbitMQMessagePublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessagePublisher.class);
+ private final Exchange camelExchange;
+ private final Channel channel;
+ private final String routingKey;
+ private final RabbitMQEndpoint endpoint;
+ private final Message message;
+
+ public RabbitMQMessagePublisher(final Exchange camelExchange, final Channel channel, final String routingKey, final RabbitMQEndpoint endpoint) {
+ this.camelExchange = camelExchange;
+ this.channel = channel;
+ this.routingKey = routingKey;
+ this.endpoint = endpoint;
+ this.message = resolveMessageFrom(camelExchange);
+ }
+
+ private Message resolveMessageFrom(final Exchange camelExchange) {
+ Message message = camelExchange.hasOut() ? camelExchange.getOut() : camelExchange.getIn();
+
+ // Remove the SERIALIZE_HEADER in case it was previously set
+ if (message.getHeaders() != null && message.getHeaders().containsKey(RabbitMQEndpoint.SERIALIZE_HEADER)) {
+ LOG.debug("Removing the {} header", RabbitMQEndpoint.SERIALIZE_HEADER);
+ message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
+ }
+
+ return message;
+ }
+
+ public void publish() throws IOException {
+ AMQP.BasicProperties properties;
+ byte[] body;
+ try {
+ // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
+ body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, message.getBody());
+
+ properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+ } catch (NoTypeConversionAvailableException | TypeConversionException e) {
+ if (message.getBody() instanceof Serializable) {
+ // Add the header so the reply processor knows to de-serialize it
+ message.getHeaders().put(RabbitMQEndpoint.SERIALIZE_HEADER, true);
+ properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+ body = serializeBodyFrom(message);
+ } else if (message.getBody() == null) {
+ properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+ body = null;
+ } else {
+ LOG.warn("Could not convert {} to byte[]", message.getBody());
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ publishToRabbitMQ(properties, body);
+ }
+
+ private void publishToRabbitMQ(final AMQP.BasicProperties properties, final byte[] body) throws IOException {
+ String rabbitExchange = endpoint.getExchangeName(message);
+
+ Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, endpoint.isMandatory(), Boolean.class);
+ Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, endpoint.isImmediate(), Boolean.class);
+
+ LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
+
+ if (endpoint.isPublisherAcknowledgements()) {
+ channel.confirmSelect();
+ }
+
+ channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+
+ if (endpoint.isPublisherAcknowledgements()) {
+ waitForConfirmation();
+ }
+ }
+
+ private void waitForConfirmation() throws IOException {
+ try {
+ LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout());
+ channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout());
+ } catch (InterruptedException | TimeoutException e) {
+ LOG.warn("Acknowledgement error for {}", camelExchange);
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ private byte[] serializeBodyFrom(final Message msg) throws IOException {
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+ o.writeObject(msg.getBody());
+ return b.toByteArray();
+ } catch (NotSerializableException nse) {
+ LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects.");
+ throw new RuntimeCamelException(nse);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 52ccc90..b6dacfa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -29,6 +29,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.component.rabbitmq.RabbitMQConstants;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
@@ -37,19 +38,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
+ private static final int CLOSE_TIMEOUT = 30 * 1000;
+
protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
protected final CamelContext camelContext;
protected final CountDownLatch replyToLatch = new CountDownLatch(1);
protected final long replyToTimeout = 1000;
-
+
protected ScheduledExecutorService executorService;
protected RabbitMQEndpoint endpoint;
protected String replyTo;
- protected Connection listenerContainer;
+ protected Connection listenerContainer;
protected CorrelationTimeoutMap correlation;
-
- private int closeTimeout = 30 * 1000;
+
+ private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
public ReplyManagerSupport(CamelContext camelContext) {
this.camelContext = camelContext;
@@ -133,14 +136,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
if (log.isWarnEnabled()) {
log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
+ " Setting ExchangeTimedOutException on {} and continue routing.",
- new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)});
+ holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange));
}
// no response, so lets set a timed out exception
String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
} else {
- endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true);
+
+ messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true);
// restore correlation id in case the remote server messed with it
if (holder.getOriginalCorrelationId() != null) {
@@ -198,7 +202,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
if (answer != null) {
if (log.isTraceEnabled()) {
log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
- new Object[]{correlationID, counter, answer});
+ correlationID, counter, answer);
}
}
}
@@ -229,8 +233,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
ServiceHelper.stopService(correlation);
if (listenerContainer != null) {
- log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout);
- listenerContainer.close(closeTimeout);
+ log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, CLOSE_TIMEOUT);
+ listenerContainer.close(CLOSE_TIMEOUT);
listenerContainer = null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 239fb36..5f859ad 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -59,7 +59,6 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
protected RouteBuilder createRouteBuilder() throws Exception {
context().setTracing(true);
return new RouteBuilder() {
-
@Override
public void configure() throws Exception {
from("direct:start").to(BASIC_URI);
[2/2] camel git commit: Fixed CS. Fixes #744.
Posted by da...@apache.org.
Fixed CS. Fixes #744.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c34db42c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c34db42c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c34db42c
Branch: refs/heads/master
Commit: c34db42c8f51f57e70ff7767756bdfe3421e9f7b
Parents: 48a28a2
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 09:42:24 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 09:42:24 2015 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQMessagePublisher.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c34db42c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 6f50ec4..bc78665 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -16,12 +16,20 @@
*/
package org.apache.camel.component.rabbitmq;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
-import org.apache.camel.*;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;