You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/25 10:27:01 UTC
git commit: CAMEL-7384 Allow connection factory tuning with thanks to Gérald
Repository: camel
Updated Branches:
refs/heads/master 913d589b6 -> a0b500a4f
CAMEL-7384 Allow connection factory tuning with thanks to Gérald
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a0b500a4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a0b500a4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a0b500a4
Branch: refs/heads/master
Commit: a0b500a4f2e138f1a66e8b408df0f1b27433fa68
Parents: 913d589
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Apr 25 16:26:12 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Apr 25 16:26:12 2014 +0800
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQComponent.java | 19 +-
.../component/rabbitmq/RabbitMQEndpoint.java | 174 +++++++++++++++++--
.../rabbitmq/RabbitMQComponentTest.java | 37 ++++
.../rabbitmq/RabbitMQEndpointTest.java | 56 ++++++
4 files changed, 268 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 56db13d..585a07f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -19,6 +19,10 @@ package org.apache.camel.component.rabbitmq;
import java.net.URI;
import java.util.Map;
+import javax.net.ssl.TrustManager;
+
+import com.rabbitmq.client.ConnectionFactory;
+
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultComponent;
import org.slf4j.Logger;
@@ -44,11 +48,22 @@ public class RabbitMQComponent extends DefaultComponent {
int portNumber = host.getPort();
String exchangeName = host.getPath().substring(1);
- RabbitMQEndpoint endpoint = new RabbitMQEndpoint(uri, this);
+ // ConnectionFactory reference
+ ConnectionFactory connectionFactory = resolveAndRemoveReferenceParameter(params, "connectionFactory", ConnectionFactory.class);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> clientProperties = resolveAndRemoveReferenceParameter(params, "clientProperties", Map.class);
+ TrustManager trustManager = resolveAndRemoveReferenceParameter(params, "trustManager", TrustManager.class);
+ RabbitMQEndpoint endpoint;
+ if (connectionFactory == null) {
+ endpoint = new RabbitMQEndpoint(uri, this);
+ } else {
+ endpoint = new RabbitMQEndpoint(uri, this, connectionFactory);
+ }
endpoint.setHostname(hostname);
endpoint.setPortNumber(portNumber);
endpoint.setExchangeName(exchangeName);
-
+ endpoint.setClientProperties(clientProperties);
+ endpoint.setTrustManager(trustManager);
setProperties(endpoint, params);
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 338c1c5..646a633 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -18,11 +18,15 @@ package org.apache.camel.component.rabbitmq;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import javax.net.ssl.TrustManager;
+
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
@@ -41,9 +45,9 @@ import org.apache.camel.impl.DefaultMessage;
public class RabbitMQEndpoint extends DefaultEndpoint {
- private String username;
- private String password;
- private String vhost;
+ private String username = ConnectionFactory.DEFAULT_USER;
+ private String password = ConnectionFactory.DEFAULT_PASS;
+ private String vhost = ConnectionFactory.DEFAULT_VHOST;
private String hostname;
private int threadPoolSize = 10;
private int portNumber;
@@ -56,7 +60,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private String exchangeType = "direct";
private String routingKey;
private Address[] addresses;
-
+ private int connectionTimeout = ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT;
+ private int requestedChannelMax = ConnectionFactory.DEFAULT_CHANNEL_MAX;
+ private int requestedFrameMax = ConnectionFactory.DEFAULT_FRAME_MAX;
+ private int requestedHeartbeat = ConnectionFactory.DEFAULT_HEARTBEAT;
+ private String sslProtocol;
+ private TrustManager trustManager;
+ private Map<String, Object> clientProperties;
+ private ConnectionFactory connectionFactory;
+ private Boolean automaticRecoveryEnabled;
+ private Integer networkRecoveryInterval;
+ private Boolean topologyRecoveryEnabled;
+
public RabbitMQEndpoint() {
}
@@ -64,6 +79,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
super(endpointUri, component);
}
+ public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component, ConnectionFactory connectionFactory) throws URISyntaxException {
+ super(endpointUri, component);
+ this.connectionFactory = connectionFactory;
+ }
+
public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
@@ -99,21 +119,55 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
}
public Connection connect(ExecutorService executor) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername(getUsername());
- factory.setPassword(getPassword());
- if (getVhost() == null) {
- factory.setVirtualHost("/");
- } else {
- factory.setVirtualHost(getVhost());
- }
- factory.setHost(getHostname());
- factory.setPort(getPortNumber());
if (getAddresses() == null) {
- return factory.newConnection(executor);
+ return getOrCreateConnectionFactory().newConnection(executor);
} else {
- return factory.newConnection(executor, getAddresses());
+ return getOrCreateConnectionFactory().newConnection(executor, getAddresses());
+ }
+ }
+
+ private ConnectionFactory getOrCreateConnectionFactory() {
+ if (connectionFactory == null) {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setUsername(getUsername());
+ factory.setPassword(getPassword());
+ factory.setVirtualHost(getVhost());
+ factory.setHost(getHostname());
+ factory.setPort(getPortNumber());
+ if (getClientProperties() != null) {
+ factory.setClientProperties(getClientProperties());
+ }
+ factory.setConnectionTimeout(getConnectionTimeout());
+ factory.setRequestedChannelMax(getRequestedChannelMax());
+ factory.setRequestedFrameMax(getRequestedFrameMax());
+ factory.setRequestedHeartbeat(getRequestedHeartbeat());
+ if (getSslProtocol() != null) {
+ try {
+ if (getSslProtocol().equals("true")) {
+ factory.useSslProtocol();
+ } else if (getTrustManager() == null) {
+ factory.useSslProtocol(getSslProtocol());
+ } else {
+ factory.useSslProtocol(getSslProtocol(), getTrustManager());
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
+ } catch (KeyManagementException e) {
+ throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e);
+ }
+ }
+ if (getAutomaticRecoveryEnabled() != null) {
+ factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled());
+ }
+ if (getNetworkRecoveryInterval() != null) {
+ factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval());
+ }
+ if (getTopologyRecoveryEnabled() != null) {
+ factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled());
+ }
+ connectionFactory = factory;
}
+ return connectionFactory;
}
@Override
@@ -256,4 +310,92 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public Address[] getAddresses() {
return addresses;
}
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public int getRequestedChannelMax() {
+ return requestedChannelMax;
+ }
+
+ public void setRequestedChannelMax(int requestedChannelMax) {
+ this.requestedChannelMax = requestedChannelMax;
+ }
+
+ public int getRequestedFrameMax() {
+ return requestedFrameMax;
+ }
+
+ public void setRequestedFrameMax(int requestedFrameMax) {
+ this.requestedFrameMax = requestedFrameMax;
+ }
+
+ public int getRequestedHeartbeat() {
+ return requestedHeartbeat;
+ }
+
+ public void setRequestedHeartbeat(int requestedHeartbeat) {
+ this.requestedHeartbeat = requestedHeartbeat;
+ }
+
+ public String getSslProtocol() {
+ return sslProtocol;
+ }
+
+ public void setSslProtocol(String sslProtocol) {
+ this.sslProtocol = sslProtocol;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public TrustManager getTrustManager() {
+ return trustManager;
+ }
+
+ public void setTrustManager(TrustManager trustManager) {
+ this.trustManager = trustManager;
+ }
+
+ public Map<String, Object> getClientProperties() {
+ return clientProperties;
+ }
+
+ public void setClientProperties(Map<String, Object> clientProperties) {
+ this.clientProperties = clientProperties;
+ }
+
+ public Boolean getAutomaticRecoveryEnabled() {
+ return automaticRecoveryEnabled;
+ }
+
+ public void setAutomaticRecoveryEnabled(Boolean automaticRecoveryEnabled) {
+ this.automaticRecoveryEnabled = automaticRecoveryEnabled;
+ }
+
+ public Integer getNetworkRecoveryInterval() {
+ return networkRecoveryInterval;
+ }
+
+ public void setNetworkRecoveryInterval(Integer networkRecoveryInterval) {
+ this.networkRecoveryInterval = networkRecoveryInterval;
+ }
+
+ public Boolean getTopologyRecoveryEnabled() {
+ return topologyRecoveryEnabled;
+ }
+
+ public void setTopologyRecoveryEnabled(Boolean topologyRecoveryEnabled) {
+ this.topologyRecoveryEnabled = topologyRecoveryEnabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 454a26d..5506aab 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -19,11 +19,17 @@ package org.apache.camel.component.rabbitmq;
import java.util.HashMap;
import java.util.Map;
+import com.rabbitmq.client.ConnectionFactory;
+
import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
public class RabbitMQComponentTest {
@@ -39,6 +45,11 @@ public class RabbitMQComponentTest {
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
assertEquals("direct", endpoint.getExchangeType());
+ assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
+ assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
+ assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, endpoint.getRequestedFrameMax());
+ assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, endpoint.getRequestedHeartbeat());
+ assertNull(endpoint.getConnectionFactory());
}
@Test
@@ -53,6 +64,10 @@ public class RabbitMQComponentTest {
params.put("hostname", "special.host");
params.put("queue", "queuey");
params.put("exchangeType", "topic");
+ params.put("connectionTimeout", 123);
+ params.put("requestedChannelMax", 456);
+ params.put("requestedFrameMax", 789);
+ params.put("requestedHeartbeat", 321);
RabbitMQEndpoint endpoint = createEndpoint(params);
@@ -67,6 +82,10 @@ public class RabbitMQComponentTest {
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
assertEquals("topic", endpoint.getExchangeType());
+ assertEquals(123, endpoint.getConnectionTimeout());
+ assertEquals(456, endpoint.getRequestedChannelMax());
+ assertEquals(789, endpoint.getRequestedFrameMax());
+ assertEquals(321, endpoint.getRequestedHeartbeat());
}
private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
@@ -75,4 +94,22 @@ public class RabbitMQComponentTest {
return new RabbitMQComponent(context).createEndpoint(uri, remaining, params);
}
+
+ @Test
+ public void testConnectionFactoryRef() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ ConnectionFactory connectionFactoryMock = Mockito.mock(ConnectionFactory.class);
+ registry.put("connectionFactoryMock", connectionFactoryMock);
+
+ CamelContext defaultContext = new DefaultCamelContext(registry);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put("connectionFactory", "#connectionFactoryMock");
+
+ RabbitMQEndpoint endpoint = new RabbitMQComponent(defaultContext).createEndpoint("rabbitmq:localhost/exchange", "localhost/exchange", params);
+
+ assertSame(connectionFactoryMock, endpoint.getConnectionFactory());
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a0b500a4/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 21c97f2..11b3675 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -21,12 +21,16 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.LongStringHelper;
+
import org.apache.camel.Exchange;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
@@ -126,4 +130,56 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]);
assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]);
}
+
+ private ConnectionFactory createConnectionFactory(String uri) {
+ RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
+ return endpoint.getConnectionFactory();
+ }
+
+ @Test
+ public void testCreateConnectionFactoryDefault() throws Exception {
+ ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange");
+
+ assertEquals("localhost", connectionFactory.getHost());
+ assertEquals(1234, connectionFactory.getPort());
+ assertEquals(ConnectionFactory.DEFAULT_VHOST, connectionFactory.getVirtualHost());
+ assertEquals(ConnectionFactory.DEFAULT_USER, connectionFactory.getUsername());
+ assertEquals(ConnectionFactory.DEFAULT_PASS, connectionFactory.getPassword());
+ assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, connectionFactory.getConnectionTimeout());
+ assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, connectionFactory.getRequestedChannelMax());
+ assertEquals(ConnectionFactory.DEFAULT_FRAME_MAX, connectionFactory.getRequestedFrameMax());
+ assertEquals(ConnectionFactory.DEFAULT_HEARTBEAT, connectionFactory.getRequestedHeartbeat());
+ assertFalse(connectionFactory.isSSL());
+ assertFalse(connectionFactory.isAutomaticRecoveryEnabled());
+ assertEquals(5000, connectionFactory.getNetworkRecoveryInterval());
+ assertTrue(connectionFactory.isTopologyRecoveryEnabled());
+ }
+
+ @Test
+ public void testCreateConnectionFactoryCustom() throws Exception {
+ ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"
+ + "?username=userxxx"
+ + "&password=passxxx"
+ + "&connectionTimeout=123"
+ + "&requestedChannelMax=456"
+ + "&requestedFrameMax=789"
+ + "&requestedHeartbeat=987"
+ + "&sslProtocol=true"
+ + "&automaticRecoveryEnabled=true"
+ + "&networkRecoveryInterval=654"
+ + "&topologyRecoveryEnabled=false");
+
+ assertEquals("localhost", connectionFactory.getHost());
+ assertEquals(1234, connectionFactory.getPort());
+ assertEquals("userxxx", connectionFactory.getUsername());
+ assertEquals("passxxx", connectionFactory.getPassword());
+ assertEquals(123, connectionFactory.getConnectionTimeout());
+ assertEquals(456, connectionFactory.getRequestedChannelMax());
+ assertEquals(789, connectionFactory.getRequestedFrameMax());
+ assertEquals(987, connectionFactory.getRequestedHeartbeat());
+ assertTrue(connectionFactory.isSSL());
+ assertTrue(connectionFactory.isAutomaticRecoveryEnabled());
+ assertEquals(654, connectionFactory.getNetworkRecoveryInterval());
+ assertFalse(connectionFactory.isTopologyRecoveryEnabled());
+ }
}