You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 09:42:29 UTC

[1/2] camel git commit: RabbitMQEndpoint refactoring, extracted a few helper classes to avoid RabbitMQEndpoint becoming a god class

Repository: camel
Updated Branches:
  refs/heads/master bd226cbe8 -> c34db42c8


RabbitMQEndpoint refactoring, extracted a few helper classes to avoid RabbitMQEndpoint becoming a god class


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48a28a2c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48a28a2c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48a28a2c

Branch: refs/heads/master
Commit: 48a28a2c301c684e5481d27975cb1572c7201df6
Parents: bd226cb
Author: Miloš Milivojević <mm...@deployinc.com>
Authored: Thu Dec 24 11:01:24 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 09:40:00 2015 +0100

----------------------------------------------------------------------
 .../RabbitMQConnectionFactorySupport.java       |  64 +++++
 .../component/rabbitmq/RabbitMQConsumer.java    |   2 +-
 .../rabbitmq/RabbitMQDeclareSupport.java        | 103 ++++++++
 .../component/rabbitmq/RabbitMQEndpoint.java    | 245 +------------------
 .../rabbitmq/RabbitMQMessageConverter.java      | 115 +++++++--
 .../rabbitmq/RabbitMQMessagePublisher.java      | 123 ++++++++++
 .../rabbitmq/reply/ReplyManagerSupport.java     |  22 +-
 .../rabbitmq/RabbitMQProducerIntTest.java       |   1 -
 8 files changed, 411 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
new file mode 100644
index 0000000..de6cc24
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RabbitMQConnectionFactorySupport {
+    
+    public ConnectionFactory createFactoryFor(final RabbitMQEndpoint endpoint) {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setUsername(endpoint.getUsername());
+        factory.setPassword(endpoint.getPassword());
+        factory.setVirtualHost(endpoint.getVhost());
+        factory.setHost(endpoint.getHostname());
+        factory.setPort(endpoint.getPortNumber());
+        if (endpoint.getClientProperties() != null) {
+            factory.setClientProperties(endpoint.getClientProperties());
+        }
+        factory.setConnectionTimeout(endpoint.getConnectionTimeout());
+        factory.setRequestedChannelMax(endpoint.getRequestedChannelMax());
+        factory.setRequestedFrameMax(endpoint.getRequestedFrameMax());
+        factory.setRequestedHeartbeat(endpoint.getRequestedHeartbeat());
+        if (endpoint.getSslProtocol() != null) {
+            try {
+                if (endpoint.getSslProtocol().equals("true")) {
+                    factory.useSslProtocol();
+                } else if (endpoint.getTrustManager() == null) {
+                    factory.useSslProtocol(endpoint.getSslProtocol());
+                } else {
+                    factory.useSslProtocol(endpoint.getSslProtocol(), endpoint.getTrustManager());
+                }
+            } catch (NoSuchAlgorithmException | KeyManagementException e) {
+                throw new IllegalArgumentException("Invalid sslProtocol " + endpoint.getSslProtocol(), e);
+            }
+        }
+        if (endpoint.getAutomaticRecoveryEnabled() != null) {
+            factory.setAutomaticRecoveryEnabled(endpoint.getAutomaticRecoveryEnabled());
+        }
+        if (endpoint.getNetworkRecoveryInterval() != null) {
+            factory.setNetworkRecoveryInterval(endpoint.getNetworkRecoveryInterval());
+        }
+        if (endpoint.getTopologyRecoveryEnabled() != null) {
+            factory.setTopologyRecoveryEnabled(endpoint.getTopologyRecoveryEnabled());
+        }
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index cdb23f4..a71769e 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -94,7 +94,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         // First channel used to declare Exchange and Queue
         Channel channel = openChannel();
         if (getEndpoint().isDeclare()) {
-            endpoint.declareExchangeAndQueue(channel);
+            getEndpoint().declareExchangeAndQueue(channel);
         }
         startConsumer(channel);
         // Other channels

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
new file mode 100644
index 0000000..aa4df2f
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.rabbitmq.client.Channel;
+
+public class RabbitMQDeclareSupport {
+
+    private final RabbitMQEndpoint endpoint;
+
+    RabbitMQDeclareSupport(final RabbitMQEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public void declareAndBindExchangesAndQueuesUsing(final Channel channel) throws IOException {
+        declareAndBindDeadLetterExchangeWithQueue(channel);
+        declareAndBindExchangeWithQueue(channel);
+    }
+
+    private void declareAndBindDeadLetterExchangeWithQueue(final Channel channel) throws IOException {
+        if (endpoint.getDeadLetterExchange() != null) {
+            // TODO Do we need to setup the args for the DeadLetter?
+            declareExchange(channel, endpoint.getDeadLetterExchange(), endpoint.getDeadLetterExchangeType(), Collections.<String, Object>emptyMap());
+            declareAndBindQueue(channel, endpoint.getDeadLetterQueue(), endpoint.getDeadLetterExchange(), endpoint.getDeadLetterRoutingKey(), null);
+        }
+    }
+
+    private void declareAndBindExchangeWithQueue(final Channel channel) throws IOException {
+        declareExchange(channel, endpoint.getExchangeName(), endpoint.getExchangeType(), resolvedExchangeArguments());
+
+        if (shouldDeclareQueue()) {
+            // need to make sure the queueDeclare is same with the exchange declare
+            declareAndBindQueue(channel, endpoint.getQueue(), endpoint.getExchangeName(), endpoint.getRoutingKey(), resolvedQueueArguments());
+        }
+    }
+
+    private Map<String, Object> resolvedQueueArguments() {
+        Map<String, Object> queueArgs = new HashMap<>();
+        populateQueueArgumentsFromDeadLetterExchange(queueArgs);
+        populateQueueArgumentsFromConfigurer(queueArgs);
+        return queueArgs;
+    }
+
+    private Map<String, Object> populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) {
+        if (endpoint.getDeadLetterExchange() != null) {
+            queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange());
+            queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+        }
+
+        return queueArgs;
+    }
+
+    private Map<String, Object> resolvedExchangeArguments() {
+        Map<String, Object> exchangeArgs = new HashMap<>();
+        if (endpoint.getExchangeArgsConfigurer() != null) {
+            endpoint.getExchangeArgsConfigurer().configurArgs(exchangeArgs);
+        }
+        return exchangeArgs;
+    }
+
+    private boolean shouldDeclareQueue() {
+        return !endpoint.isSkipQueueDeclare() && endpoint.getQueue() != null;
+    }
+
+    private void populateQueueArgumentsFromConfigurer(final Map<String, Object> queueArgs) {
+        if (endpoint.getQueueArgsConfigurer() != null) {
+            endpoint.getQueueArgsConfigurer().configurArgs(queueArgs);
+        }
+    }
+
+    private void declareExchange(final Channel channel, final String exchange, final String exchangeType, final Map<String, Object> exchangeArgs) throws IOException {
+        channel.exchangeDeclare(exchange, exchangeType, endpoint.isDurable(), endpoint.isAutoDelete(), exchangeArgs);
+    }
+
+    private void declareAndBindQueue(final Channel channel, final String queue, final String exchange, final String routingKey, final Map<String, Object> arguments)
+            throws IOException {
+        channel.queueDeclare(queue, endpoint.isDurable(), false, endpoint.isAutoDelete(), arguments);
+        channel.queueBind(queue, exchange, emptyIfNull(routingKey));
+    }
+
+    private String emptyIfNull(final String routingKey) {
+        return routingKey == null ? "" : routingKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7a6e48c..41eab3f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -16,18 +16,8 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.NotSerializableException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -41,17 +31,12 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.LongString;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConversionException;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -59,14 +44,11 @@ import org.apache.camel.spi.UriPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * The rabbitmq component allows AMQP messages to be sent to (or consumed from) a RabbitMQ broker.
- */
 @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
 public class RabbitMQEndpoint extends DefaultEndpoint {
-    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
     // header to indicate that the message body needs to be de-serialized
-    private static final String SERIALIZE_HEADER = "CamelSerialize";
+    public static final String SERIALIZE_HEADER = "CamelSerialize";
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
 
     @UriPath @Metadata(required = "true")
     private String hostname;
@@ -172,7 +154,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private String replyTo;
 
     private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
-    
+    private final RabbitMQConnectionFactorySupport factoryCreator = new RabbitMQConnectionFactorySupport();
+    private final RabbitMQDeclareSupport declareSupport = new RabbitMQDeclareSupport(this);
 
     public RabbitMQEndpoint() {
     }
@@ -188,7 +171,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
 
     public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
         Exchange exchange = super.createExchange();
-        setRabbitExchange(exchange, envelope, properties, body, false);
+        messageConverter.populateRabbitExchange(exchange, envelope, properties, body, false);
         return exchange;
     }
 
@@ -199,132 +182,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         return messageConverter;
     }
 
-    public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, boolean out) {
-        Message message;
-        if (out) {
-            // use OUT message
-            message = camelExchange.getOut();
-        }  else {
-            if (camelExchange.getIn() != null) {
-                // Use the existing message so we keep the headers
-                message = camelExchange.getIn();
-            } else {
-                message = new DefaultMessage();
-                camelExchange.setIn(message);
-            }
-        }
-
-        if (envelope != null) {
-            message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
-            message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
-            message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
-        }
-
-        Map<String, Object> headers = properties.getHeaders();
-        if (headers != null) {
-            for (Map.Entry<String, Object> entry : headers.entrySet()) {
-                // Convert LongStrings to String.
-                if (entry.getValue() instanceof LongString) {
-                    message.setHeader(entry.getKey(), entry.getValue().toString());
-                } else {
-                    message.setHeader(entry.getKey(), entry.getValue());
-                }
-            }
-        }
-
-        if (hasSerializeHeader(properties)) {
-            Object messageBody = null;
-            try (InputStream b = new ByteArrayInputStream(body);
-                            ObjectInputStream o = new ObjectInputStream(b);) {
-                messageBody = o.readObject();
-            } catch (IOException | ClassNotFoundException e) {
-                LOG.warn("Could not deserialize the object");
-            }
-            if (messageBody instanceof Throwable) {
-                LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
-                camelExchange.setException((Throwable) messageBody);
-            } else {
-                message.setBody(messageBody);
-            }
-        } else {
-            // Set the body as a byte[] and let the type converter deal with it
-            message.setBody(body);
-        }
-
-    }
-
-    private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
-        if (properties == null || properties.getHeaders() == null) {
-            return false;
-        }
-        if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) {
-            return true;
-        }
-        return false;
-    }
-
     /**
      * Sends the body that is on the exchange
      */
     public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException {
-        Message msg;
-        if (camelExchange.hasOut()) {
-            msg = camelExchange.getOut();
-        } else {
-            msg = camelExchange.getIn();
-        }
-
-        // Remove the SERIALIZE_HEADER in case it was previously set
-        if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) {
-            LOG.debug("Removing the {} header", SERIALIZE_HEADER);
-            msg.getHeaders().remove(SERIALIZE_HEADER);
-        }
-
-        AMQP.BasicProperties properties;
-        byte[] body;
-        try {
-            // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
-            body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody());
-
-            properties = getMessageConverter().buildProperties(camelExchange).build();
-        } catch (NoTypeConversionAvailableException | TypeConversionException e) {
-            if (msg.getBody() instanceof Serializable) {
-                // Add the header so the reply processor knows to de-serialize it
-                msg.getHeaders().put(SERIALIZE_HEADER, true);
-
-                properties = getMessageConverter().buildProperties(camelExchange).build();
-
-                try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) {
-                    o.writeObject(msg.getBody());
-                    body = b.toByteArray();
-                } catch (NotSerializableException nse) {
-                    LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects.");
-                    throw new RuntimeCamelException(e);
-                }
-            } else if (msg.getBody() == null) {
-                properties = getMessageConverter().buildProperties(camelExchange).build();
-                body = null;
-            } else {
-                LOG.warn("Could not convert {} to byte[]", msg.getBody());
-                throw new RuntimeCamelException(e);
-            }
-        }
-        String rabbitExchange = getExchangeName(msg);
-
-        Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class);
-        Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class);
-
-        LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
-
-        if (isPublisherAcknowledgements()) {
-            channel.confirmSelect();
-        }
-        
-        channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
-
-        if (isPublisherAcknowledgements()) {
-            waitForConfirmationFor(channel, camelExchange);
-        }
+        new RabbitMQMessagePublisher(camelExchange, channel, routingKey, this).publish();
     }
 
     /**
@@ -337,16 +199,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
             exchangeName = getExchangeName();
         }
         return exchangeName;
-    }      
-    
-    private void waitForConfirmationFor(final Channel channel, final Exchange camelExchange) throws IOException {
-        try {
-            LOG.debug("Waiting for publisher acknowledgements for {}ms", getPublisherAcknowledgementsTimeout());
-            channel.waitForConfirmsOrDie(getPublisherAcknowledgementsTimeout());
-        } catch (InterruptedException | TimeoutException e) {
-            LOG.warn("Acknowledgement error for {}", camelExchange);
-            throw new RuntimeCamelException(e);
-        }
     }
 
     @Override
@@ -368,88 +220,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
      * If needed, declare Exchange, declare Queue and bind them with Routing Key
      */
     public void declareExchangeAndQueue(Channel channel) throws IOException {
-        Map<String, Object> queueArgs = new HashMap<String, Object>();
-        Map<String, Object> exchangeArgs = new HashMap<String, Object>();
-        
-        if (deadLetterExchange != null) {
-            queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, getDeadLetterExchange());
-            queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, getDeadLetterRoutingKey());
-            // TODO Do we need to setup the args for the DeadLetter?
-            channel.exchangeDeclare(getDeadLetterExchange(),
-                    getDeadLetterExchangeType(),
-                    isDurable(),
-                    isAutoDelete(),
-                    new HashMap<String, Object>());
-            channel.queueDeclare(getDeadLetterQueue(), isDurable(), false,
-                    isAutoDelete(), null);
-            channel.queueBind(
-                    getDeadLetterQueue(),
-                    getDeadLetterExchange(),
-                    getDeadLetterRoutingKey() == null ? "" : getDeadLetterRoutingKey());
-        }
-        
-        if (getQueueArgsConfigurer() != null) {
-            getQueueArgsConfigurer().configurArgs(queueArgs);
-        }
-        if (getExchangeArgsConfigurer() != null) {
-            getExchangeArgsConfigurer().configurArgs(exchangeArgs);
-        }
-        
-        channel.exchangeDeclare(getExchangeName(),
-                getExchangeType(),
-                isDurable(),
-                isAutoDelete(), exchangeArgs);
-        if (!isSkipQueueDeclare() && getQueue() != null) {
-            // need to make sure the queueDeclare is same with the exchange declare
-            channel.queueDeclare(getQueue(), isDurable(), false,
-                    isAutoDelete(), queueArgs);
-            channel.queueBind(
-                    getQueue(),
-                    getExchangeName(),
-                    getRoutingKey() == null ? "" : getRoutingKey());
-        }
+        declareSupport.declareAndBindExchangesAndQueuesUsing(channel);
     }
 
     private ConnectionFactory getOrCreateConnectionFactory() {
         if (connectionFactory == null) {
-            ConnectionFactory factory = new ConnectionFactory();
-            factory.setUsername(getUsername());
-            factory.setPassword(getPassword());
-            factory.setVirtualHost(getVhost());
-            factory.setHost(getHostname());
-            factory.setPort(getPortNumber());
-            if (getClientProperties() != null) {
-                factory.setClientProperties(getClientProperties());
-            }
-            factory.setConnectionTimeout(getConnectionTimeout());
-            factory.setRequestedChannelMax(getRequestedChannelMax());
-            factory.setRequestedFrameMax(getRequestedFrameMax());
-            factory.setRequestedHeartbeat(getRequestedHeartbeat());
-            if (getSslProtocol() != null) {
-                try {
-                    if (getSslProtocol().equals("true")) {
-                        factory.useSslProtocol();
-                    } else if (getTrustManager() == null) {
-                        factory.useSslProtocol(getSslProtocol());
-                    } else {
-                        factory.useSslProtocol(getSslProtocol(), getTrustManager());
-                    }
-                } catch (NoSuchAlgorithmException e) {
-                    throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
-                } catch (KeyManagementException e) {
-                    throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
-                }
-            }
-            if (getAutomaticRecoveryEnabled() != null) {
-                factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled());
-            }
-            if (getNetworkRecoveryInterval() != null) {
-                factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
-            }
-            if (getTopologyRecoveryEnabled() != null) {
-                factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled());
-            }
-            connectionFactory = factory;
+            connectionFactory = factoryCreator.createFactoryFor(this);
         }
         return connectionFactory;
     }
@@ -847,7 +623,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public void setDeclare(boolean declare) {
         this.declare = declare;
     }
-    
+
     public String getDeadLetterExchange() {
         return deadLetterExchange;
     }
@@ -958,7 +734,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public ArgsConfigurer getExchangeArgsConfigurer() {
         return exchangeArgsConfigurer;
     }
-    
+
     /**
      * Set the configurer for setting the exchange args in Channel.exchangeDeclare
      */
@@ -1041,4 +817,5 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public String getReplyTo() {
         return replyTo;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 66299f6..1873674 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -16,17 +16,20 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import java.math.BigDecimal;
-import java.sql.Timestamp;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.LongString;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +156,7 @@ public class RabbitMQMessageConverter {
         }
 
         final Map<String, Object> headers = msg.getHeaders();
-        Map<String, Object> filteredHeaders = new HashMap<String, Object>();
+        Map<String, Object> filteredHeaders = new HashMap<>();
 
         // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
         for (Map.Entry<String, Object> header : headers.entrySet()) {
@@ -166,9 +169,7 @@ public class RabbitMQMessageConverter {
                     LOG.debug("Ignoring header: {} with null value", header.getKey());
                 } else {
                     LOG.debug("Ignoring header: {} of class: {} with value: {}",
-                                    new Object[] {
-                                                    header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()
-                                    });
+                              header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue());
                 }
             }
         }
@@ -191,8 +192,6 @@ public class RabbitMQMessageConverter {
     private Object getValidRabbitMQHeaderValue(Object headerValue) {
         if (headerValue instanceof String) {
             return headerValue;
-        } else if (headerValue instanceof BigDecimal) {
-            return headerValue;
         } else if (headerValue instanceof Number) {
             return headerValue;
         } else if (headerValue instanceof Boolean) {
@@ -203,18 +202,96 @@ public class RabbitMQMessageConverter {
             return headerValue;
         } else if (headerValue instanceof LongString) {
             return headerValue;
-        } else if (headerValue instanceof Timestamp) {
-            return headerValue;
-        } else if (headerValue instanceof Byte) {
-            return headerValue;
-        } else if (headerValue instanceof Double) {
-            return headerValue;
-        } else if (headerValue instanceof Float) {
-            return headerValue;
-        } else if (headerValue instanceof Long) {
-            return headerValue;
         }
 
         return null;
     }
+
+    public void populateRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, final boolean out) {
+        Message message = resolveMessageFrom(camelExchange, out);
+        populateMessageHeaders(message, envelope, properties);
+        populateMessageBody(message, camelExchange, properties, body);
+    }
+
+    private Message resolveMessageFrom(final Exchange camelExchange, final boolean out) {
+        Message message;
+        if (out) {
+            // use OUT message
+            message = camelExchange.getOut();
+        }  else {
+            if (camelExchange.getIn() != null) {
+                // Use the existing message so we keep the headers
+                message = camelExchange.getIn();
+            } else {
+                message = new DefaultMessage();
+                camelExchange.setIn(message);
+            }
+        }
+        return message;
+    }
+
+    private void populateMessageHeaders(final Message message, final Envelope envelope, final AMQP.BasicProperties properties) {
+        populateRoutingInfoHeaders(message, envelope);
+        populateMessageHeadersFromRabbitMQHeaders(message, properties);
+    }
+
+    private void populateRoutingInfoHeaders(final Message message, final Envelope envelope) {
+        if (envelope != null) {
+            message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
+            message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
+            message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+        }
+    }
+
+    private void populateMessageHeadersFromRabbitMQHeaders(final Message message, final AMQP.BasicProperties properties) {
+        Map<String, Object> headers = properties.getHeaders();
+        if (headers != null) {
+            for (Map.Entry<String, Object> entry : headers.entrySet()) {
+                // Convert LongStrings to String.
+                if (entry.getValue() instanceof LongString) {
+                    message.setHeader(entry.getKey(), entry.getValue().toString());
+                } else {
+                    message.setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+    }
+
+    private void populateMessageBody(final Message message, final Exchange camelExchange, final AMQP.BasicProperties properties, final byte[] body) {
+        if (hasSerializeHeader(properties)) {
+            deserializeBody(camelExchange, message, body);
+        } else {
+            // Set the body as a byte[] and let the type converter deal with it
+            message.setBody(body);
+        }
+    }
+
+    private void deserializeBody(final Exchange camelExchange, final Message message, final byte[] body) {
+        Object messageBody = null;
+        try (InputStream b = new ByteArrayInputStream(body);
+             ObjectInputStream o = new ObjectInputStream(b)) {
+            messageBody = o.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            LOG.warn("Could not deserialize the object");
+            camelExchange.setException(e);
+        }
+        if (messageBody instanceof Throwable) {
+            LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
+            camelExchange.setException((Throwable) messageBody);
+        } else {
+            message.setBody(messageBody);
+        }
+    }
+
+    private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
+        return hasHeaders(properties) && Boolean.TRUE.equals(isSerializeHeaderEnabled(properties));
+    }
+
+    private boolean hasHeaders(final AMQP.BasicProperties properties) {
+        return properties != null && properties.getHeaders() != null;
+    }
+
+    private Object isSerializeHeaderEnabled(final AMQP.BasicProperties properties) {
+        return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
new file mode 100644
index 0000000..6f50ec4
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.*;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import org.apache.camel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A method object for publishing to RabbitMQ
+ */
+public class RabbitMQMessagePublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessagePublisher.class);
+    private final Exchange camelExchange;
+    private final Channel channel;
+    private final String routingKey;
+    private final RabbitMQEndpoint endpoint;
+    private final Message message;
+
+    public RabbitMQMessagePublisher(final Exchange camelExchange, final Channel channel, final String routingKey, final RabbitMQEndpoint endpoint) {
+        this.camelExchange = camelExchange;
+        this.channel = channel;
+        this.routingKey = routingKey;
+        this.endpoint = endpoint;
+        this.message = resolveMessageFrom(camelExchange);
+    }
+
+    private Message resolveMessageFrom(final Exchange camelExchange) {
+        Message message = camelExchange.hasOut() ? camelExchange.getOut() : camelExchange.getIn();
+
+        // Remove the SERIALIZE_HEADER in case it was previously set
+        if (message.getHeaders() != null && message.getHeaders().containsKey(RabbitMQEndpoint.SERIALIZE_HEADER)) {
+            LOG.debug("Removing the {} header", RabbitMQEndpoint.SERIALIZE_HEADER);
+            message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
+        }
+        
+        return message;
+    }
+
+    public void publish() throws IOException {
+        AMQP.BasicProperties properties;
+        byte[] body;
+        try {
+            // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
+            body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, message.getBody());
+
+            properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+        } catch (NoTypeConversionAvailableException | TypeConversionException e) {
+            if (message.getBody() instanceof Serializable) {
+                // Add the header so the reply processor knows to de-serialize it
+                message.getHeaders().put(RabbitMQEndpoint.SERIALIZE_HEADER, true);
+                properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+                body = serializeBodyFrom(message);
+            } else if (message.getBody() == null) {
+                properties = endpoint.getMessageConverter().buildProperties(camelExchange).build();
+                body = null;
+            } else {
+                LOG.warn("Could not convert {} to byte[]", message.getBody());
+                throw new RuntimeCamelException(e);
+            }
+        }
+        
+        publishToRabbitMQ(properties, body);
+    }
+
+    private void publishToRabbitMQ(final AMQP.BasicProperties properties, final byte[] body) throws IOException {
+        String rabbitExchange = endpoint.getExchangeName(message);
+
+        Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, endpoint.isMandatory(), Boolean.class);
+        Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, endpoint.isImmediate(), Boolean.class);
+
+        LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
+
+        if (endpoint.isPublisherAcknowledgements()) {
+            channel.confirmSelect();
+        }
+
+        channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+
+        if (endpoint.isPublisherAcknowledgements()) {
+            waitForConfirmation();
+        }
+    }
+
+    private void waitForConfirmation() throws IOException {
+        try {
+            LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout());
+            channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout());
+        } catch (InterruptedException | TimeoutException e) {
+            LOG.warn("Acknowledgement error for {}", camelExchange);
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    private byte[] serializeBodyFrom(final Message msg) throws IOException {
+        try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+            o.writeObject(msg.getBody());
+            return b.toByteArray();
+        } catch (NotSerializableException nse) {
+            LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects.");
+            throw new RuntimeCamelException(nse);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 52ccc90..b6dacfa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -29,6 +29,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.component.rabbitmq.RabbitMQConstants;
 import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -37,19 +38,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
+    private static final int CLOSE_TIMEOUT = 30 * 1000;
+    
     protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
     protected final CamelContext camelContext;
     protected final CountDownLatch replyToLatch = new CountDownLatch(1);
     protected final long replyToTimeout = 1000;
-
+    
     protected ScheduledExecutorService executorService;
     protected RabbitMQEndpoint endpoint;
     protected String replyTo;
-    protected Connection listenerContainer;
 
+    protected Connection listenerContainer;
     protected CorrelationTimeoutMap correlation;
-
-    private int closeTimeout = 30 * 1000;
+    
+    private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
 
     public ReplyManagerSupport(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -133,14 +136,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
                     if (log.isWarnEnabled()) {
                         log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
                                 + " Setting ExchangeTimedOutException on {} and continue routing.",
-                                new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)});
+                                 holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange));
                     }
 
                     // no response, so lets set a timed out exception
                     String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
                     exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
                 } else {
-                    endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true);
+                    
+                    messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true);
 
                     // restore correlation id in case the remote server messed with it
                     if (holder.getOriginalCorrelationId() != null) {
@@ -198,7 +202,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
             if (answer != null) {
                 if (log.isTraceEnabled()) {
                     log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
-                            new Object[]{correlationID, counter, answer});
+                              correlationID, counter, answer);
                 }
             }
         }
@@ -229,8 +233,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         ServiceHelper.stopService(correlation);
 
         if (listenerContainer != null) {
-            log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout);
-            listenerContainer.close(closeTimeout);
+            log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, CLOSE_TIMEOUT);
+            listenerContainer.close(CLOSE_TIMEOUT);
             listenerContainer = null;
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 239fb36..5f859ad 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -59,7 +59,6 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         context().setTracing(true);
         return new RouteBuilder() {
-
             @Override
             public void configure() throws Exception {
                 from("direct:start").to(BASIC_URI);


[2/2] camel git commit: Fixed CS. Fixes #744.

Posted by da...@apache.org.
Fixed CS. Fixes #744.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c34db42c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c34db42c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c34db42c

Branch: refs/heads/master
Commit: c34db42c8f51f57e70ff7767756bdfe3421e9f7b
Parents: 48a28a2
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 09:42:24 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 09:42:24 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQMessagePublisher.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c34db42c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 6f50ec4..bc78665 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -16,12 +16,20 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
-import org.apache.camel.*;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConversionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;