You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/06/10 07:38:57 UTC
camel git commit: Fix code formatting issues in camel-rabbitmq
component.
Repository: camel
Updated Branches:
refs/heads/master a6c7373d4 -> d62e0ac7c
Fix code formatting issues in camel-rabbitmq component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d62e0ac7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d62e0ac7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d62e0ac7
Branch: refs/heads/master
Commit: d62e0ac7c7f6b4851d6a756dfc4337981c51441e
Parents: a6c7373
Author: Brad Reitmeyer <br...@cisco.com>
Authored: Tue Jun 9 18:20:46 2015 -0500
Committer: Brad Reitmeyer <gi...@bradreitmeyer.com>
Committed: Tue Jun 9 18:22:06 2015 -0500
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQComponent.java | 4 +-
.../component/rabbitmq/RabbitMQConsumer.java | 6 +-
.../component/rabbitmq/RabbitMQEndpoint.java | 5 +-
.../rabbitmq/RabbitMQMessageConverter.java | 10 +--
.../component/rabbitmq/RabbitMQProducer.java | 2 +-
.../rabbitmq/reply/CorrelationTimeoutMap.java | 21 +++++-
.../component/rabbitmq/reply/ReplyHolder.java | 14 ++--
.../component/rabbitmq/reply/ReplyManager.java | 28 ++++++--
.../rabbitmq/reply/ReplyManagerSupport.java | 68 +++++++++++---------
.../reply/TemporaryQueueReplyHandler.java | 12 ++--
.../reply/TemporaryQueueReplyManager.java | 30 ++++-----
.../rabbitmq/RabbitMQEndpointTest.java | 20 +++---
.../rabbitmq/RabbitMQInOutIntTest.java | 10 +--
.../testbeans/TestSerializableObject.java | 16 +++++
14 files changed, 149 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index c125421..3fe86c5 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -21,13 +21,13 @@ import java.util.Map;
import javax.net.ssl.TrustManager;
+import com.rabbitmq.client.ConnectionFactory;
+
import org.apache.camel.CamelContext;
import org.apache.camel.impl.UriEndpointComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.rabbitmq.client.ConnectionFactory;
-
public class RabbitMQComponent extends UriEndpointComponent {
private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index bf142ce..48ec60f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -216,14 +216,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
channel.basicAck(deliveryTag, false);
}
- }
- else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
+ } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
// the inOut exchange failed so put the exception in the body and send back
msg.setBody(exchange.getException());
exchange.setOut(msg);
endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
- }
- else {
+ } else {
boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
// processing failed, then reject and handle the exception
if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 6cd0bca..979efb3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -224,7 +224,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
if (hasSerializeHeader(properties)) {
Object messageBody = null;
- try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+ try (InputStream b = new ByteArrayInputStream(body);
+ ObjectInputStream o = new ObjectInputStream(b);) {
messageBody = o.readObject();
} catch (IOException | ClassNotFoundException e) {
LOG.warn("Could not deserialize the object");
@@ -287,7 +288,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
properties = getMessageConverter().buildProperties(camelExchange).build();
- try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) {
o.writeObject(msg.getBody());
body = b.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 95abe81..4d8f35c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -22,15 +22,15 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.LongString;
+
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.LongString;
-
public class RabbitMQMessageConverter {
protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
@@ -167,7 +167,9 @@ public class RabbitMQMessageConverter {
LOG.debug("Ignoring header: {} with null value", header.getKey());
} else {
LOG.debug("Ignoring header: {} of class: {} with value: {}",
- new Object[] { header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() });
+ new Object[] {
+ header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()
+ });
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index b8c8ba2..a96d6fd 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -40,6 +40,7 @@ import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
public class RabbitMQProducer extends DefaultAsyncProducer {
+ private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
private Connection conn;
private ObjectPool<Channel> channelPool;
@@ -47,7 +48,6 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
private int closeTimeout = 30 * 1000;
private final AtomicBoolean started = new AtomicBoolean(false);
- private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
private ReplyManager replyManager;
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
index fad4fc0..8f052a0 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.rabbitmq.reply;
import java.util.concurrent.ScheduledExecutorService;
@@ -76,8 +92,8 @@ public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandle
@Override
public ReplyHandler putIfAbsent(String key, ReplyHandler value, long timeoutMillis) {
- log.info("in putIfAbsent with key {}", key);
-
+ log.info("in putIfAbsent with key {}", key);
+
try {
if (listener != null) {
listener.onPut(key);
@@ -117,4 +133,3 @@ public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandle
}
}
-
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
index d2890d0..b6fe68d 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
@@ -16,11 +16,11 @@
*/
package org.apache.camel.component.rabbitmq.reply;
+import com.rabbitmq.client.AMQP;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import com.rabbitmq.client.AMQP;
-
/**
* Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
* when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
@@ -41,7 +41,7 @@ public class ReplyHolder {
* Constructor to use when a reply message was received
*/
public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
- String correlationId,AMQP.BasicProperties properties, byte[] message) {
+ String correlationId, AMQP.BasicProperties properties, byte[] message) {
this.exchange = exchange;
this.callback = callback;
this.originalCorrelationId = originalCorrelationId;
@@ -54,7 +54,7 @@ public class ReplyHolder {
* Constructor to use when a timeout occurred
*/
public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
- String correlationId, long timeout) {
+ String correlationId, long timeout) {
this(exchange, callback, originalCorrelationId, correlationId, null, null);
this.timeout = timeout;
}
@@ -112,12 +112,12 @@ public class ReplyHolder {
public long getRequestTimeout() {
return timeout;
}
-
+
/**
* The message properties
* @return
*/
- public AMQP.BasicProperties getProperties(){
- return properties;
+ public AMQP.BasicProperties getProperties() {
+ return properties;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
index 7c1c015..f6eb64a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.rabbitmq.reply;
import java.util.concurrent.ScheduledExecutorService;
@@ -15,7 +31,7 @@ import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
*/
public interface ReplyManager {
- /**
+ /**
* Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.
*/
void setEndpoint(RabbitMQEndpoint endpoint);
@@ -26,13 +42,13 @@ public interface ReplyManager {
* The queue is either a temporary or a persistent queue.
*/
void setReplyTo(String replyTo);
-
+
/**
* Gets the reply to queue being used
*/
String getReplyTo();
-
- /**
+
+ /**
* Register a reply
*
* @param replyManager the reply manager being used
@@ -45,13 +61,12 @@ public interface ReplyManager {
*/
String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout);
-
+
/**
* Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
*/
void setScheduledExecutorService(ScheduledExecutorService executorService);
-
/**
* Updates the correlation id to the new correlation id.
* <p/>
@@ -65,7 +80,6 @@ public interface ReplyManager {
* @param requestTimeout the timeout
*/
void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
-
/**
* Process the reply
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 6e41f99..f4f4711 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -1,9 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.rabbitmq.reply;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -17,24 +36,21 @@ import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
-
-
-public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager{
-
- protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
+public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
+ protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
protected final CamelContext camelContext;
-
+ protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+ protected final long replyToTimeout = 1000;
+
protected ScheduledExecutorService executorService;
protected RabbitMQEndpoint endpoint;
protected String replyTo;
protected Connection listenerContainer;
- private int closeTimeout = 30 * 1000;
- protected final CountDownLatch replyToLatch = new CountDownLatch(1);
- protected final long replyToTimeout = 1000;
+
protected CorrelationTimeoutMap correlation;
+ private int closeTimeout = 30 * 1000;
+
public ReplyManagerSupport(CamelContext camelContext) {
this.camelContext = camelContext;
}
@@ -74,10 +90,9 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
}
return replyTo;
}
-
+
public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout) {
- log.debug("in registerReply");
// add to correlation map
QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback,
originalCorrelationId, correlationId, requestTimeout);
@@ -89,8 +104,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
}
return correlationId;
}
-
-
+
protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout);
@@ -109,7 +123,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
}
public void processReply(ReplyHolder holder) {
- log.info("in processReply");
+ log.info("in processReply");
if (holder != null && isRunAllowed()) {
try {
Exchange exchange = holder.getExchange();
@@ -127,19 +141,17 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
} else {
-
- endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage());
-
- // restore correlation id in case the remote server messed with it
+
+ endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage());
+
+ // restore correlation id in case the remote server messed with it
if (holder.getOriginalCorrelationId() != null) {
- if(exchange.getOut() != null){
+ if (exchange.getOut() != null) {
exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
- }
- else{
+ } else {
exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
- }
+ }
}
-
}
} finally {
// notify callback
@@ -148,8 +160,6 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
}
}
}
-
-
protected abstract void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message);
@@ -216,7 +226,6 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
log.debug("Using executor {}", executorService);
}
-
@Override
protected void doStop() throws Exception {
@@ -227,12 +236,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
listenerContainer.close(closeTimeout);
listenerContainer = null;
}
-
+
// must also stop executor service
if (executorService != null) {
camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
executorService = null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
index c7fcf41..bb0a102 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
@@ -16,13 +16,13 @@
*/
package org.apache.camel.component.rabbitmq.reply;
+import com.rabbitmq.client.AMQP;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import org.apache.camel.component.rabbitmq.RabbitMQConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.rabbitmq.client.AMQP;
/**
* {@link ReplyHandler} to handle processing replies when using temporary queues.
@@ -31,8 +31,8 @@ import com.rabbitmq.client.AMQP;
*/
public class TemporaryQueueReplyHandler implements ReplyHandler {
- protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class);
-
+ protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class);
+
// task queue to add the holder so we can process the reply
protected final ReplyManager replyManager;
protected final Exchange exchange;
@@ -54,7 +54,7 @@ public class TemporaryQueueReplyHandler implements ReplyHandler {
public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
// create holder object with the the reply
- log.info("in onReply with correlationId= {}", correlationId);
+ log.info("in onReply with correlationId= {}", correlationId);
ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
// process the reply
replyManager.processReply(holder);
@@ -62,7 +62,7 @@ public class TemporaryQueueReplyHandler implements ReplyHandler {
public void onTimeout(String correlationId) {
// create holder object without the reply which means a timeout occurred
- log.info("in onTimeout with correlationId= {}", correlationId);
+ log.info("in onTimeout with correlationId= {}", correlationId);
ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout);
// process timeout
replyManager.processReply(holder);
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index 6cae778..4bd1242 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -17,11 +17,6 @@
package org.apache.camel.component.rabbitmq.reply;
import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
@@ -29,6 +24,9 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
/**
* A {@link ReplyManager} when using temporary queues.
@@ -36,13 +34,13 @@ import com.rabbitmq.client.Envelope;
* @version
*/
public class TemporaryQueueReplyManager extends ReplyManagerSupport {
-
+
private RabbitConsumer consumer;
public TemporaryQueueReplyManager(CamelContext camelContext) {
super(camelContext);
}
-
+
protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout) {
return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
@@ -85,7 +83,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ endpoint.isPrefetchGlobal());
}
//Let the server pick a random name for us
@@ -99,16 +97,16 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
consumer = new RabbitConsumer(this, channel);
consumer.start();
- return conn;
+ return conn;
}
-
+
@Override
protected void doStop() throws Exception {
super.doStop();
consumer.stop();
}
- //TODO combine with class in RabbitMQConsumer
+ //TODO combine with class in RabbitMQConsumer
class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
private final TemporaryQueueReplyManager consumer;
@@ -131,9 +129,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
- consumer.onMessage(properties, body);
+ consumer.onMessage(properties, body);
}
-
+
/**
* Bind consumer to channel
*/
@@ -147,9 +145,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
private void stop() throws IOException {
if (channel.isOpen()) {
if (tag != null) {
- channel.basicCancel(tag);
- }
- channel.close();
+ channel.basicCancel(tag);
+ }
+ channel.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 19c580f..1fa4d17 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -26,23 +26,23 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-import org.mockito.Mockito;
-
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.LongStringHelper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.mockito.Mockito;
+
public class RabbitMQEndpointTest extends CamelTestSupport {
private Envelope envelope = Mockito.mock(Envelope.class);
private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class);
-
+
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
registry.bind("argsConfigurer", new ArgsConfigurer() {
@@ -75,7 +75,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
assertEquals(body, exchange.getIn().getBody());
}
-
+
@Test
public void testExchangeNameIsOptional() throws Exception {
RabbitMQEndpoint endpoint1 = context.getEndpoint("rabbitmq:localhost/", RabbitMQEndpoint.class);
@@ -83,7 +83,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
RabbitMQEndpoint endpoint2 = context.getEndpoint("rabbitmq:localhost?autoAck=false", RabbitMQEndpoint.class);
assertEquals("Get a wrong exchange name", "", endpoint2.getExchangeName());
-
+
RabbitMQEndpoint endpoint3 = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class);
assertEquals("Get a wrong exchange name", "exchange", endpoint3.getExchangeName());
}
@@ -156,7 +156,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertTrue(endpoint.isSingleton());
}
-
+
@Test
public void testArgConfigurer() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?queueArgsConfigurer=#argsConfigurer", RabbitMQEndpoint.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
index 5c1223e..9e121f8 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -108,7 +108,7 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
@Test
public void headerTest() throws InterruptedException, IOException {
- Map<String, Object> headers = new HashMap<>();
+ Map<String, Object> headers = new HashMap<String, Object>();
TestSerializableObject testObject = new TestSerializableObject();
testObject.setName("header");
@@ -124,8 +124,8 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
headers.put("CamelSerialize", true);
// populate a map and an arrayList
- Map<Object, Object> tmpMap = new HashMap<>();
- List<String> tmpList = new ArrayList<>();
+ Map<Object, Object> tmpMap = new HashMap<Object, Object>();
+ List<String> tmpList = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
String name = "header" + i;
tmpList.add(name);
@@ -156,13 +156,13 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
foo.setName("foobar");
byte[] body = null;
- try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) {
o.writeObject(foo);
body = b.toByteArray();
}
TestSerializableObject newFoo = null;
- try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+ try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b);) {
newFoo = (TestSerializableObject) o.readObject();
} catch (IOException | ClassNotFoundException e) {
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
index 1fdffd0..0b0e5cc 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.rabbitmq.testbeans;
import java.io.Serializable;