You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 07:31:33 UTC

[camel] branch master updated: CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bc0da1  CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling
5bc0da1 is described below

commit 5bc0da1ada6b82abe6f2957cc1bd4389bf5a731c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 09:15:51 2021 +0200

    CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling
---
 .../springrabbit/EndpointMessageListener.java        | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index e43b783..84e5d59 100644
--- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.springrabbit;
 
+import java.util.Map;
+
 import com.rabbitmq.client.Channel;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -139,6 +141,8 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
             // if we failed processed the exchange from the async callback task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // release back when synchronous mode
+            consumer.releaseExchange(exchange, false);
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -155,9 +159,18 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
     }
 
     protected Exchange createExchange(Message message, Channel channel, Object replyDestination) {
-        Exchange exchange = endpoint.createExchange(message);
+        Exchange exchange = consumer.createExchange(false);
         exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel);
 
+        Object body = endpoint.getMessageConverter().fromMessage(message);
+        exchange.getMessage().setBody(body);
+
+        Map<String, Object> headers
+                = endpoint.getMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), exchange);
+        if (!headers.isEmpty()) {
+            exchange.getMessage().setHeaders(headers);
+        }
+
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
             // only change pattern if not already out capable
@@ -245,6 +258,11 @@ public class EndpointMessageListener implements ChannelAwareMessageListener {
                     }
                 }
             }
+
+            if (!doneSync) {
+                // release back when in asynchronous mode
+                consumer.releaseExchange(exchange, false);
+            }
         }
 
         private void sendReply(Address replyDestination, Message message, Exchange exchange, org.apache.camel.Message out) {