You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/04 07:43:57 UTC
[2/3] camel git commit: CAMEL-7421: Fixed CS
CAMEL-7421: Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/00e93fc3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/00e93fc3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/00e93fc3
Branch: refs/heads/master
Commit: 00e93fc3b5664b3b830a163819c285a0150532f5
Parents: 3ad5018
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 4 07:34:40 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:34:40 2014 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQProducer.java | 53 ++++++++++++--------
.../rabbitmq/pool/PoolableChannelFactory.java | 17 ++++---
.../rabbitmq/RabbitMQSpringIntTest.java | 33 ++++++------
3 files changed, 59 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 755fa93..54562d5 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,20 +16,20 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ObjectHelper;
-
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Exchange;
import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
@@ -42,11 +42,12 @@ public class RabbitMQProducer extends DefaultProducer {
*/
private int channelPoolMaxSize = 10;
/**
- * Maximum time (in milliseconds) waiting for channel
- */
+ * Maximum time (in milliseconds) waiting for channel
+ */
private long channelPoolMaxWait = 1000;
private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
+
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
}
@@ -55,12 +56,14 @@ public class RabbitMQProducer extends DefaultProducer {
public RabbitMQEndpoint getEndpoint() {
return (RabbitMQEndpoint) super.getEndpoint();
}
+
/**
* Channel callback (similar to Spring JDBC ConnectionCallback)
*/
private static interface ChannelCallback<T> {
- public T doWithChannel(Channel channel) throws Exception;
+ T doWithChannel(Channel channel) throws Exception;
}
+
/**
* Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
*/
@@ -72,6 +75,7 @@ public class RabbitMQProducer extends DefaultProducer {
channelPool.returnObject(channel);
}
}
+
/**
* Open connection and initialize channel pool
*/
@@ -81,7 +85,7 @@ public class RabbitMQProducer extends DefaultProducer {
log.debug("Created connection: {}", conn);
log.trace("Creating channel pool...");
- channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
+ channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
if (getEndpoint().isDeclare()) {
execute(new ChannelCallback<Void>() {
@Override
@@ -147,14 +151,15 @@ public class RabbitMQProducer extends DefaultProducer {
}
/**
- * Send a message borrowing a channel from the pool
- * @param exchange Target exchange
+ * Send a message borrowing a channel from the pool.
+ *
+ * @param exchange Target exchange
* @param routingKey Routing key
* @param properties Header properties
- * @param body Body content
+ * @param body Body content
*/
private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception {
- if (channelPool==null) {
+ if (channelPool == null) {
// Open connection and channel lazily
openConnectionAndChannelPool();
}
@@ -174,7 +179,7 @@ public class RabbitMQProducer extends DefaultProducer {
if (contentType != null) {
properties.contentType(contentType.toString());
}
-
+
final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
if (priority != null) {
properties.priority(Integer.parseInt(priority.toString()));
@@ -263,8 +268,8 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* Strategy to test if the given header is valid
*
- * @param headerValue the header value
- * @return the value to use, <tt>null</tt> to ignore this header
+ * @param headerValue the header value
+ * @return the value to use, <tt>null</tt> to ignore this header
* @see com.rabbitmq.client.impl.Frame#fieldValueSize
*/
private Object getValidRabbitMQHeaderValue(Object headerValue) {
@@ -294,33 +299,37 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* Get maximum number of opened channel in pool
+ *
* @return Maximum number of opened channel in pool
*/
public int getChannelPoolMaxSize() {
- return channelPoolMaxSize;
+ return channelPoolMaxSize;
}
/**
* Set maximum number of opened channel in pool
+ *
* @param channelPoolMaxSize Maximum number of opened channel in pool
*/
public void setChannelPoolMaxSize(int channelPoolMaxSize) {
- this.channelPoolMaxSize = channelPoolMaxSize;
+ this.channelPoolMaxSize = channelPoolMaxSize;
}
/**
* Get the maximum number of milliseconds to wait for a channel from the pool
+ *
* @return Maximum number of milliseconds waiting for a channel
*/
public long getChannelPoolMaxWait() {
- return channelPoolMaxWait;
+ return channelPoolMaxWait;
}
/**
* Set the maximum number of milliseconds to wait for a channel from the pool
+ *
* @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
*/
public void setChannelPoolMaxWait(long channelPoolMaxWait) {
- this.channelPoolMaxWait = channelPoolMaxWait;
+ this.channelPoolMaxWait = channelPoolMaxWait;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index b9bed13..b10201f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -1,9 +1,10 @@
-/*
- * Copyright 2014 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -13,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.rabbitmq.pool;
import com.rabbitmq.client.Channel;
@@ -21,9 +21,10 @@ import com.rabbitmq.client.Connection;
import org.apache.commons.pool.PoolableObjectFactory;
/**
- * Channel lifecyle manager: create, check and close channel
+ * Channel lifecycle manager: create, check and close channel
*/
public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+
/**
* Parent connection
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index f65b909..6a3a3a5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -16,39 +16,48 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.*;
+import java.io.IOException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import java.io.IOException;
-import java.util.HashMap;
-
import static org.junit.Assert.assertEquals;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/**
* Test RabbitMQ component with Spring DSL
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class RabbitMQSpringIntTest {
+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+
@Produce(uri = "direct:rabbitMQ")
protected ProducerTemplate template;
@Autowired
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
+
private boolean isConnectionOpened() {
- return connection!=null && connection.isOpen();
+ return connection != null && connection.isOpen();
}
+
private Connection openConnection() throws IOException {
if (!isConnectionOpened()) {
LOGGER.info("Open connection");
@@ -56,9 +65,11 @@ public class RabbitMQSpringIntTest {
}
return connection;
}
+
private boolean isChannelOpened() {
return channel != null && channel.isOpen();
}
+
private Channel openChannel() throws IOException {
if (!isChannelOpened()) {
LOGGER.info("Open channel");
@@ -70,12 +81,6 @@ public class RabbitMQSpringIntTest {
@Before
public void bindQueueExchange() throws IOException {
openChannel();
- /*
- LOGGER.info("Declare exchange queue");
- channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>());
- channel.queueDeclare("q2", true, false, false, null);
- channel.queueBind("q2", "ex2", "rk2");
- */
}
@After
@@ -118,7 +123,7 @@ public class RabbitMQSpringIntTest {
}
@Test
- public void testSendCsutomConnectionFactory() throws Exception {
+ public void testSendCustomConnectionFactory() throws Exception {
String body = "Hello Rabbit";
template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");