You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 07:31:33 UTC
[camel] branch master updated: CAMEL-16366: camel-spring-rabbitmq -
RabbitMQ consumer supports exchange pooling
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc0da1 CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling
5bc0da1 is described below
commit 5bc0da1ada6b82abe6f2957cc1bd4389bf5a731c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 09:15:51 2021 +0200
CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling
---
.../springrabbit/EndpointMessageListener.java | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index e43b783..84e5d59 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.springrabbit;
+import java.util.Map;
+
import com.rabbitmq.client.Channel;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -139,6 +141,8 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
// if we failed processed the exchange from the async callback task, then grab the exception
rce = exchange.getException(RuntimeCamelException.class);
+ // release back when synchronous mode
+ consumer.releaseExchange(exchange, false);
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
@@ -155,9 +159,18 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
}
protected Exchange createExchange(Message message, Channel channel, Object replyDestination) {
- Exchange exchange = endpoint.createExchange(message);
+ Exchange exchange = consumer.createExchange(false);
exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel);
+ Object body = endpoint.getMessageConverter().fromMessage(message);
+ exchange.getMessage().setBody(body);
+
+ Map<String, Object> headers
+ = endpoint.getMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), exchange);
+ if (!headers.isEmpty()) {
+ exchange.getMessage().setHeaders(headers);
+ }
+
// lets set to an InOut if we have some kind of reply-to destination
if (replyDestination != null && !disableReplyTo) {
// only change pattern if not already out capable
@@ -245,6 +258,11 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
}
}
}
+
+ if (!doneSync) {
+ // release back when in asynchronous mode
+ consumer.releaseExchange(exchange, false);
+ }
}
private void sendReply(Address replyDestination, Message message, Exchange exchange, org.apache.camel.Message out) {