You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/25 10:27:01 UTC

git commit: CAMEL-7384 Allow connection factory tuning with thanks to Gérald

Repository: camel
Updated Branches:
  refs/heads/master 913d589b6 -> a0b500a4f


CAMEL-7384 Allow connection factory tuning with thanks to Gérald


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0b500a4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0b500a4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0b500a4

Branch: refs/heads/master
Commit: a0b500a4f2e138f1a66e8b408df0f1b27433fa68
Parents: 913d589
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Apr 25 16:26:12 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Apr 25 16:26:12 2014 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQComponent.java   |  19 +-
 .../component/rabbitmq/RabbitMQEndpoint.java    | 174 +++++++++++++++++--
 .../rabbitmq/RabbitMQComponentTest.java         |  37 ++++
 .../rabbitmq/RabbitMQEndpointTest.java          |  56 ++++++
 4 files changed, 268 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 56db13d..585a07f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -19,6 +19,10 @@ package org.apache.camel.component.rabbitmq;
 import java.net.URI;
 import java.util.Map;
 
+import javax.net.ssl.TrustManager;
+
+import com.rabbitmq.client.ConnectionFactory;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultComponent;
 import org.slf4j.Logger;
@@ -44,11 +48,22 @@ public class RabbitMQComponent extends DefaultComponent {
         int portNumber = host.getPort();
         String exchangeName = host.getPath().substring(1);
 
-        RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, this);
+        // ConnectionFactory reference
+        ConnectionFactory connectionFactory = resolveAndRemoveReferenceParameter(params, "connectionFactory", ConnectionFactory.class);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> clientProperties = resolveAndRemoveReferenceParameter(params, "clientProperties", Map.class);
+        TrustManager trustManager = resolveAndRemoveReferenceParameter(params, "trustManager", TrustManager.class);
+        RabbitMQEndpoint endpoint;
+        if (connectionFactory == null) {
+            endpoint = new RabbitMQEndpoint(uri, this);
+        } else {
+            endpoint = new RabbitMQEndpoint(uri, this, connectionFactory);
+        }
         endpoint.setHostname(hostname);
         endpoint.setPortNumber(portNumber);
         endpoint.setExchangeName(exchangeName);
-
+        endpoint.setClientProperties(clientProperties);
+        endpoint.setTrustManager(trustManager);
         setProperties(endpoint, params);
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 338c1c5..646a633 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -18,11 +18,15 @@ package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import javax.net.ssl.TrustManager;
+
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
 import com.rabbitmq.client.Connection;
@@ -41,9 +45,9 @@ import org.apache.camel.impl.DefaultMessage;
 
 public class RabbitMQEndpoint extends DefaultEndpoint {
 
-    private String username;
-    private String password;
-    private String vhost;
+    private String username = ConnectionFactory.DEFAULT_USER;
+    private String password = ConnectionFactory.DEFAULT_PASS;
+    private String vhost = ConnectionFactory.DEFAULT_VHOST;
     private String hostname;
     private int threadPoolSize = 10;
     private int portNumber;
@@ -56,7 +60,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private String exchangeType = "direct";
     private String routingKey;
     private Address[] addresses;
-    
+    private int connectionTimeout = ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT;
+    private int requestedChannelMax = ConnectionFactory.DEFAULT_CHANNEL_MAX;
+    private int requestedFrameMax = ConnectionFactory.DEFAULT_FRAME_MAX;
+    private int requestedHeartbeat = ConnectionFactory.DEFAULT_HEARTBEAT;
+    private String sslProtocol;
+    private TrustManager trustManager;
+    private Map<String, Object> clientProperties;
+    private ConnectionFactory connectionFactory;
+    private Boolean automaticRecoveryEnabled;
+    private Integer networkRecoveryInterval;
+    private Boolean topologyRecoveryEnabled;
+
     public RabbitMQEndpoint() {
     }
 
@@ -64,6 +79,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         super(endpointUri, component);
     }
 
+    public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component, ConnectionFactory connectionFactory) throws URISyntaxException {
+        super(endpointUri, component);
+        this.connectionFactory = connectionFactory;
+    }
+
     public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
         Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
 
@@ -99,21 +119,55 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     }
 
     public Connection connect(ExecutorService executor) throws IOException {
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setUsername(getUsername());
-        factory.setPassword(getPassword());
-        if (getVhost() == null) {
-            factory.setVirtualHost("/");
-        } else {
-            factory.setVirtualHost(getVhost());
-        }
-        factory.setHost(getHostname());
-        factory.setPort(getPortNumber());
         if (getAddresses() == null) {
-            return factory.newConnection(executor);
+            return getOrCreateConnectionFactory().newConnection(executor);
         } else {
-            return factory.newConnection(executor, getAddresses());
+            return getOrCreateConnectionFactory().newConnection(executor, getAddresses());
+        }
+    }
+
+    private ConnectionFactory getOrCreateConnectionFactory() {
+        if (connectionFactory == null) {
+            ConnectionFactory factory = new ConnectionFactory();
+            factory.setUsername(getUsername());
+            factory.setPassword(getPassword());
+            factory.setVirtualHost(getVhost());
+            factory.setHost(getHostname());
+            factory.setPort(getPortNumber());
+            if (getClientProperties() != null) {
+                factory.setClientProperties(getClientProperties());
+            }
+            factory.setConnectionTimeout(getConnectionTimeout());
+            factory.setRequestedChannelMax(getRequestedChannelMax());
+            factory.setRequestedFrameMax(getRequestedFrameMax());
+            factory.setRequestedHeartbeat(getRequestedHeartbeat());
+            if (getSslProtocol() != null) {
+                try {
+                    if (getSslProtocol().equals("true")) {
+                        factory.useSslProtocol();
+                    } else if (getTrustManager() == null) {
+                        factory.useSslProtocol(getSslProtocol());
+                    } else {
+                        factory.useSslProtocol(getSslProtocol(), getTrustManager());
+                    }
+                } catch (NoSuchAlgorithmException e) {
+                    throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
+                } catch (KeyManagementException e) {
+                    throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
+                }
+            }
+            if (getAutomaticRecoveryEnabled() != null) {
+                factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled());
+            }
+            if (getNetworkRecoveryInterval() != null) {
+                factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
+            }
+            if (getTopologyRecoveryEnabled() != null) {
+                factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled());
+            }
+            connectionFactory = factory;
         }
+        return connectionFactory;
     }
 
     @Override
@@ -256,4 +310,92 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public Address[] getAddresses() {
         return addresses;
     }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public int getRequestedChannelMax() {
+        return requestedChannelMax;
+    }
+
+    public void setRequestedChannelMax(int requestedChannelMax) {
+        this.requestedChannelMax = requestedChannelMax;
+    }
+
+    public int getRequestedFrameMax() {
+        return requestedFrameMax;
+    }
+
+    public void setRequestedFrameMax(int requestedFrameMax) {
+        this.requestedFrameMax = requestedFrameMax;
+    }
+
+    public int getRequestedHeartbeat() {
+        return requestedHeartbeat;
+    }
+
+    public void setRequestedHeartbeat(int requestedHeartbeat) {
+        this.requestedHeartbeat = requestedHeartbeat;
+    }
+
+    public String getSslProtocol() {
+        return sslProtocol;
+    }
+
+    public void setSslProtocol(String sslProtocol) {
+        this.sslProtocol = sslProtocol;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public TrustManager getTrustManager() {
+        return trustManager;
+    }
+
+    public void setTrustManager(TrustManager trustManager) {
+        this.trustManager = trustManager;
+    }
+
+    public Map<String, Object> getClientProperties() {
+        return clientProperties;
+    }
+
+    public void setClientProperties(Map<String, Object> clientProperties) {
+        this.clientProperties = clientProperties;
+    }
+
+    public Boolean getAutomaticRecoveryEnabled() {
+        return automaticRecoveryEnabled;
+    }
+
+    public void setAutomaticRecoveryEnabled(Boolean automaticRecoveryEnabled) {
+        this.automaticRecoveryEnabled = automaticRecoveryEnabled;
+    }
+
+    public Integer getNetworkRecoveryInterval() {
+        return networkRecoveryInterval;
+    }
+
+    public void setNetworkRecoveryInterval(Integer networkRecoveryInterval) {
+        this.networkRecoveryInterval = networkRecoveryInterval;
+    }
+
+    public Boolean getTopologyRecoveryEnabled() {
+        return topologyRecoveryEnabled;
+    }
+
+    public void setTopologyRecoveryEnabled(Boolean topologyRecoveryEnabled) {
+        this.topologyRecoveryEnabled = topologyRecoveryEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 454a26d..5506aab 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -19,11 +19,17 @@ package org.apache.camel.component.rabbitmq;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.rabbitmq.client.ConnectionFactory;
+
 import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 
 public class RabbitMQComponentTest {
 
@@ -39,6 +45,11 @@ public class RabbitMQComponentTest {
         assertEquals(true, endpoint.isAutoDelete());
         assertEquals(true, endpoint.isDurable());
         assertEquals("direct", endpoint.getExchangeType());
+        assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
+        assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
+        assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, endpoint.getRequestedFrameMax());
+        assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, endpoint.getRequestedHeartbeat());
+        assertNull(endpoint.getConnectionFactory());
     }
 
     @Test
@@ -53,6 +64,10 @@ public class RabbitMQComponentTest {
         params.put("hostname", "special.host");
         params.put("queue", "queuey");
         params.put("exchangeType", "topic");
+        params.put("connectionTimeout", 123);
+        params.put("requestedChannelMax", 456);
+        params.put("requestedFrameMax", 789);
+        params.put("requestedHeartbeat", 321);
 
         RabbitMQEndpoint endpoint = createEndpoint(params);
 
@@ -67,6 +82,10 @@ public class RabbitMQComponentTest {
         assertEquals(true, endpoint.isAutoDelete());
         assertEquals(true, endpoint.isDurable());
         assertEquals("topic", endpoint.getExchangeType());
+        assertEquals(123, endpoint.getConnectionTimeout());
+        assertEquals(456, endpoint.getRequestedChannelMax());
+        assertEquals(789, endpoint.getRequestedFrameMax());
+        assertEquals(321, endpoint.getRequestedHeartbeat());
     }
 
     private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
@@ -75,4 +94,22 @@ public class RabbitMQComponentTest {
 
         return new RabbitMQComponent(context).createEndpoint(uri, remaining, params);
     }
+
+    @Test
+    public void testConnectionFactoryRef() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        ConnectionFactory connectionFactoryMock = Mockito.mock(ConnectionFactory.class);
+        registry.put("connectionFactoryMock", connectionFactoryMock);
+
+        CamelContext defaultContext = new DefaultCamelContext(registry);
+
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("connectionFactory", "#connectionFactoryMock");
+
+        RabbitMQEndpoint endpoint = new RabbitMQComponent(defaultContext).createEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", params);
+
+        assertSame(connectionFactoryMock, endpoint.getConnectionFactory());
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 21c97f2..11b3675 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -21,12 +21,16 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.impl.LongStringHelper;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
@@ -126,4 +130,56 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]);
         assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]);
     }
+
+    private ConnectionFactory createConnectionFactory(String uri) {
+        RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); 
+        return endpoint.getConnectionFactory();
+    }
+
+    @Test
+    public void testCreateConnectionFactoryDefault() throws Exception {
+        ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange");
+
+        assertEquals("localhost", connectionFactory.getHost());
+        assertEquals(1234, connectionFactory.getPort());
+        assertEquals(ConnectionFactory.DEFAULT_VHOST, connectionFactory.getVirtualHost());
+        assertEquals(ConnectionFactory.DEFAULT_USER, connectionFactory.getUsername());
+        assertEquals(ConnectionFactory.DEFAULT_PASS, connectionFactory.getPassword());
+        assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, connectionFactory.getConnectionTimeout());
+        assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, connectionFactory.getRequestedChannelMax());
+        assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, connectionFactory.getRequestedFrameMax());
+        assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, connectionFactory.getRequestedHeartbeat());
+        assertFalse(connectionFactory.isSSL());
+        assertFalse(connectionFactory.isAutomaticRecoveryEnabled());
+        assertEquals(5000, connectionFactory.getNetworkRecoveryInterval());
+        assertTrue(connectionFactory.isTopologyRecoveryEnabled());
+    }
+
+    @Test
+    public void testCreateConnectionFactoryCustom() throws Exception {
+        ConnectionFactory connectionFactory  = createConnectionFactory("rabbitmq:localhost:1234/exchange"
+                                                                      + "?username=userxxx"
+                                                                      + "&password=passxxx"
+                                                                      + "&connectionTimeout=123"
+                                                                      + "&requestedChannelMax=456"
+                                                                      + "&requestedFrameMax=789"
+                                                                      + "&requestedHeartbeat=987"
+                                                                      + "&sslProtocol=true"
+                                                                      + "&automaticRecoveryEnabled=true"
+                                                                      + "&networkRecoveryInterval=654"
+                                                                      + "&topologyRecoveryEnabled=false");
+
+        assertEquals("localhost", connectionFactory.getHost());
+        assertEquals(1234, connectionFactory.getPort());
+        assertEquals("userxxx", connectionFactory.getUsername());
+        assertEquals("passxxx", connectionFactory.getPassword());
+        assertEquals(123, connectionFactory.getConnectionTimeout());
+        assertEquals(456, connectionFactory.getRequestedChannelMax());
+        assertEquals(789, connectionFactory.getRequestedFrameMax());
+        assertEquals(987, connectionFactory.getRequestedHeartbeat());
+        assertTrue(connectionFactory.isSSL());
+        assertTrue(connectionFactory.isAutomaticRecoveryEnabled());
+        assertEquals(654, connectionFactory.getNetworkRecoveryInterval());
+        assertFalse(connectionFactory.isTopologyRecoveryEnabled());
+    }
 }