You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/21 09:07:30 UTC

[camel] branch exchange-factory updated (8b894c2 -> ebf620a)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 8b894c2  CAMEL-16222: PooledExchangeFactory experiment
     new 0405d19  CAMEL-16222: PooledExchangeFactory experiment
     new ebf620a  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/ahc/ws/WsConsumer.java  |  2 +-
 .../apache/camel/component/apns/ApnsConsumer.java  |  2 +-
 .../apache/camel/component/as2/AS2Consumer.java    |  9 +++---
 .../camel/component/asterisk/AsteriskConsumer.java |  9 +++---
 .../consumer/AtmosScheduledPollConsumer.java       |  4 ---
 .../consumer/AtmosScheduledPollGetConsumer.java    | 14 ++++-----
 .../atmosphere/websocket/WebsocketConsumer.java    |  6 ++--
 .../atomix/client/map/AtomixMapConsumer.java       |  2 +-
 .../client/messaging/AtomixMessagingConsumer.java  |  2 +-
 .../atomix/client/queue/AtomixQueueConsumer.java   |  2 +-
 .../atomix/client/set/AtomixSetConsumer.java       |  2 +-
 .../atomix/client/value/AtomixValueConsumer.java   |  2 +-
 .../apache/camel/component/avro/AvroEndpoint.java  | 11 -------
 .../apache/camel/component/avro/AvroListener.java  | 17 ++++++++++-
 .../azure/eventhubs/EventHubsConsumer.java         | 34 ++++++++++++++++++++--
 .../azure/eventhubs/EventHubsEndpoint.java         | 32 --------------------
 .../component/azure/storage/blob/BlobConsumer.java |  2 +-
 .../azure/storage/datalake/DataLakeConsumer.java   |  2 +-
 .../azure/storage/queue/QueueConsumer.java         | 13 ++++++++-
 .../azure/storage/queue/QueueEndpoint.java         | 10 -------
 .../component/azure/blob/BlobServiceConsumer.java  |  4 +--
 .../azure/queue/QueueServiceConsumer.java          |  5 ++--
 22 files changed, 93 insertions(+), 93 deletions(-)


[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0405d1977236f5571f62723efc3c2802a6b0e2ac
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 21 09:24:58 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/component/ahc/ws/WsConsumer.java |  2 +-
 .../java/org/apache/camel/component/apns/ApnsConsumer.java |  2 +-
 .../java/org/apache/camel/component/as2/AS2Consumer.java   |  9 +++++----
 .../apache/camel/component/asterisk/AsteriskConsumer.java  |  9 +++++----
 .../integration/consumer/AtmosScheduledPollConsumer.java   |  4 ----
 .../consumer/AtmosScheduledPollGetConsumer.java            | 14 +++++++-------
 .../component/atmosphere/websocket/WebsocketConsumer.java  |  6 +++---
 7 files changed, 22 insertions(+), 24 deletions(-)

diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index 1341bb4..8bf9923 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -68,7 +68,7 @@ public class WsConsumer extends DefaultConsumer {
     }
 
     private void sendMessageInternal(Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 
         // set the body
diff --git a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
index d0782a6..5d45e70 100644
--- a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
+++ b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
@@ -51,7 +51,7 @@ public class ApnsConsumer extends ScheduledPollConsumer {
         while (it.hasNext()) {
             InactiveDevice inactiveDevice = it.next();
 
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getIn().setBody(inactiveDevice);
             getProcessor().process(e);
         }
diff --git a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
index 7e767eb..08b68d2 100644
--- a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
+++ b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
@@ -124,17 +124,18 @@ public class AS2Consumer extends AbstractApiConsumer<AS2ApiName, AS2Configuratio
                     = HttpMessageUtils.extractEdiPayload(request, as2ServerConnection.getDecryptingPrivateKey());
 
             // Set AS2 Interchange property and EDI message into body of input message.
-            Exchange exchange = getEndpoint().createExchange();
-            HttpCoreContext coreContext = HttpCoreContext.adapt(context);
-            exchange.setProperty(AS2Constants.AS2_INTERCHANGE, coreContext);
-            exchange.getIn().setBody(ediEntity.getEdiMessage());
+            Exchange exchange = createExchange(false);
 
             try {
+                HttpCoreContext coreContext = HttpCoreContext.adapt(context);
+                exchange.setProperty(AS2Constants.AS2_INTERCHANGE, coreContext);
+                exchange.getIn().setBody(ediEntity.getEdiMessage());
                 // send message to next processor in the route
                 getProcessor().process(exchange);
             } finally {
                 // check if an exception occurred and was not handled
                 exception = exchange.getException();
+                releaseExchange(exchange, false);
             }
         } catch (Exception e) {
             LOG.warn("Failed to process AS2 message", e);
diff --git a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
index 24c8368..e650178 100644
--- a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
+++ b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
@@ -62,14 +62,15 @@ public class AsteriskConsumer extends DefaultConsumer {
     private final class EventListener implements ManagerEventListener {
         @Override
         public void onManagerEvent(ManagerEvent event) {
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName());
-            exchange.getIn().setBody(event);
-
+            Exchange exchange = createExchange(false);
             try {
+                exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName());
+                exchange.getIn().setBody(event);
                 getProcessor().process(exchange);
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error processing exchange.", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
index 0c1bb56..950415f 100644
--- a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
+++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
@@ -38,8 +38,6 @@ public abstract class AtmosScheduledPollConsumer extends ScheduledPollConsumer {
     /**
      * Lifecycle method invoked when the consumer has created. Internally create or reuse a connection to the low level
      * atmos client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStart() throws Exception {
@@ -53,8 +51,6 @@ public abstract class AtmosScheduledPollConsumer extends ScheduledPollConsumer {
 
     /**
      * Lifecycle method invoked when the consumer has destroyed. Erase the reference to the atmos low level client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStop() throws Exception {
diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
index e2d7f43..1f2fc02 100644
--- a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
+++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
@@ -32,17 +32,16 @@ public class AtmosScheduledPollGetConsumer extends AtmosScheduledPollConsumer {
     /**
      * Poll from an atmos remote path and put the result in the message exchange
      * 
-     * @return           number of messages polled
-     * @throws Exception
+     * @return number of messages polled
      */
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
-        AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient())
-                .get(configuration.getRemotePath());
-        result.populateExchange(exchange);
-
+        Exchange exchange = createExchange(false);
         try {
+            AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient())
+                    .get(configuration.getRemotePath());
+            result.populateExchange(exchange);
+
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -51,6 +50,7 @@ public class AtmosScheduledPollGetConsumer extends AtmosScheduledPollConsumer {
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 8af4199..8957b6a 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -84,7 +84,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendMessage(final String connectionKey, Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header and body
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey);
@@ -101,7 +101,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendEventNotification(String connectionKey, int eventType) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey);
@@ -122,7 +122,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header and body
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, failedConnectionKeys);


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ebf620a5ab2d222fad10916f5ca4d6be106dae47
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 21 10:06:50 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../atomix/client/map/AtomixMapConsumer.java       |  2 +-
 .../client/messaging/AtomixMessagingConsumer.java  |  2 +-
 .../atomix/client/queue/AtomixQueueConsumer.java   |  2 +-
 .../atomix/client/set/AtomixSetConsumer.java       |  2 +-
 .../atomix/client/value/AtomixValueConsumer.java   |  2 +-
 .../apache/camel/component/avro/AvroEndpoint.java  | 11 -------
 .../apache/camel/component/avro/AvroListener.java  | 17 ++++++++++-
 .../azure/eventhubs/EventHubsConsumer.java         | 34 ++++++++++++++++++++--
 .../azure/eventhubs/EventHubsEndpoint.java         | 32 --------------------
 .../component/azure/storage/blob/BlobConsumer.java |  2 +-
 .../azure/storage/datalake/DataLakeConsumer.java   |  2 +-
 .../azure/storage/queue/QueueConsumer.java         | 13 ++++++++-
 .../azure/storage/queue/QueueEndpoint.java         | 10 -------
 .../component/azure/blob/BlobServiceConsumer.java  |  4 +--
 .../azure/queue/QueueServiceConsumer.java          |  5 ++--
 15 files changed, 71 insertions(+), 69 deletions(-)

diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
index d3ad3b7..854b411 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
@@ -83,7 +83,7 @@ public final class AtomixMapConsumer extends AbstractAtomixClientConsumer<Atomix
     // ********************************************
 
     private void onEvent(DistributedMap.EntryEvent<Object, Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type());
         exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, event.entry().getKey());
 
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
index 4371216..fdcccae 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
@@ -101,7 +101,7 @@ public final class AtomixMessagingConsumer extends AbstractAtomixClientConsumer<
     // ********************************************
 
     private void onMessage(Message<Object> message) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.MESSAGE_ID, message.id());
 
         if (resultHeader == null) {
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
index d90b38d..139d8a4 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
@@ -74,7 +74,7 @@ public final class AtomixQueueConsumer extends AbstractAtomixClientConsumer<Atom
     // ********************************************
 
     private void onEvent(DistributedQueue.ValueEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type());
 
         if (resultHeader == null) {
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
index e35ad9d..10343b6 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
@@ -74,7 +74,7 @@ public final class AtomixSetConsumer extends AbstractAtomixClientConsumer<Atomix
     // ********************************************
 
     private void onEvent(DistributedSet.ValueEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type());
 
         if (resultHeader == null) {
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
index 8fea9e6..1ec5efd 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
@@ -73,7 +73,7 @@ public final class AtomixValueConsumer extends AbstractAtomixClientConsumer<Atom
     // ********************************************
 
     private void onEvent(DistributedValue.ChangeEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type());
         exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, event.oldValue());
 
diff --git a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
index d85e4dc..7ef0f54 100644
--- a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
+++ b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
@@ -54,17 +54,6 @@ public abstract class AvroEndpoint extends DefaultEndpoint implements AsyncEndpo
         return false;
     }
 
-    public Exchange createExchange(Protocol.Message message, Object request) {
-        ExchangePattern pattern = ExchangePattern.InOut;
-        if (message.getResponse().getType().equals(Schema.Type.NULL)) {
-            pattern = ExchangePattern.InOnly;
-        }
-        Exchange exchange = createExchange(pattern);
-        exchange.getIn().setBody(request);
-        exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, message.getName());
-        return exchange;
-    }
-
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         AvroConsumer consumer = new AvroConsumer(this, processor);
diff --git a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
index 72d330b..d09b92b 100644
--- a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
+++ b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
@@ -28,6 +28,7 @@ import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.avro.specific.SpecificData;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.commons.lang3.StringUtils;
 import org.eclipse.jetty.util.log.Log;
@@ -181,7 +182,7 @@ public class AvroListener {
      */
     private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception {
         Object response;
-        Exchange exchange = consumer.getEndpoint().createExchange(message, params);
+        Exchange exchange = createExchange(consumer, message, params);
 
         try {
             consumer.getProcessor().process(exchange);
@@ -206,4 +207,18 @@ public class AvroListener {
         }
         return response;
     }
+
+    protected static Exchange createExchange(AvroConsumer consumer, Protocol.Message message, Object request) {
+        ExchangePattern pattern = ExchangePattern.InOut;
+        if (message.getResponse().getType().equals(Schema.Type.NULL)) {
+            pattern = ExchangePattern.InOnly;
+        }
+        Exchange exchange = consumer.createExchange(true);
+        exchange.setPattern(pattern);
+        exchange.getIn().setBody(request);
+        exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, message.getName());
+        return exchange;
+    }
+
+
 }
diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 0d27c8e..56ccee6 100644
--- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -21,6 +21,7 @@ import com.azure.messaging.eventhubs.models.ErrorContext;
 import com.azure.messaging.eventhubs.models.EventContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
 import org.apache.camel.spi.Synchronization;
@@ -71,8 +72,37 @@ public class EventHubsConsumer extends DefaultConsumer {
         return (EventHubsEndpoint) super.getEndpoint();
     }
 
+    private Exchange createAzureEventHubExchange(final EventContext eventContext) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        // set body as byte[] and let camel typeConverters do the job to convert
+        message.setBody(eventContext.getEventData().getBody());
+        // set headers
+        message.setHeader(EventHubsConstants.PARTITION_ID, eventContext.getPartitionContext().getPartitionId());
+        message.setHeader(EventHubsConstants.PARTITION_KEY, eventContext.getEventData().getPartitionKey());
+        message.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset());
+        message.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime());
+        message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber());
+
+        return exchange;
+    }
+
+    private Exchange createAzureEventHubExchange(final ErrorContext errorContext) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        // set headers
+        message.setHeader(EventHubsConstants.PARTITION_ID, errorContext.getPartitionContext().getPartitionId());
+
+        // set exception
+        exchange.setException(errorContext.getThrowable());
+
+        return exchange;
+    }
+
     private void onEventListener(final EventContext eventContext) {
-        final Exchange exchange = getEndpoint().createAzureEventHubExchange(eventContext);
+        final Exchange exchange = createAzureEventHubExchange(eventContext);
 
         // add exchange callback
         exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
@@ -93,7 +123,7 @@ public class EventHubsConsumer extends DefaultConsumer {
     }
 
     private void onErrorListener(final ErrorContext errorContext) {
-        final Exchange exchange = getEndpoint().createAzureEventHubExchange(errorContext);
+        final Exchange exchange = createAzureEventHubExchange(errorContext);
 
         // log exception if an exception occurred and was not handled
         if (exchange.getException() != null) {
diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
index 061a277..b9010e8 100644
--- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
+++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
@@ -16,13 +16,9 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
-import com.azure.messaging.eventhubs.models.ErrorContext;
-import com.azure.messaging.eventhubs.models.EventContext;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
@@ -71,32 +67,4 @@ public class EventHubsEndpoint extends DefaultEndpoint {
         this.configuration = configuration;
     }
 
-    public Exchange createAzureEventHubExchange(final EventContext eventContext) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        // set body as byte[] and let camel typeConverters do the job to convert
-        message.setBody(eventContext.getEventData().getBody());
-        // set headers
-        message.setHeader(EventHubsConstants.PARTITION_ID, eventContext.getPartitionContext().getPartitionId());
-        message.setHeader(EventHubsConstants.PARTITION_KEY, eventContext.getEventData().getPartitionKey());
-        message.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset());
-        message.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime());
-        message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber());
-
-        return exchange;
-    }
-
-    public Exchange createAzureEventHubExchange(final ErrorContext errorContext) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        // set headers
-        message.setHeader(EventHubsConstants.PARTITION_ID, errorContext.getPartitionContext().getPartitionId());
-
-        // set exception
-        exchange.setException(errorContext.getThrowable());
-
-        return exchange;
-    }
 }
diff --git a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
index e8bd3e2..00e9490 100644
--- a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
+++ b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
@@ -81,7 +81,7 @@ public class BlobConsumer extends ScheduledBatchPollingConsumer {
         final BlobClientWrapper clientWrapper
                 = new BlobClientWrapper(blobContainerClient.getBlobClient(blobName));
         final BlobOperations operations = new BlobOperations(getEndpoint().getConfiguration(), clientWrapper);
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         BlobOperationResponse response;
         if (!ObjectHelper.isEmpty(getEndpoint().getConfiguration().getFileDir())) {
diff --git a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
index 3785ed5..915f7ee 100644
--- a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
+++ b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
@@ -102,7 +102,7 @@ class DataLakeConsumer extends ScheduledBatchPollingConsumer {
         final DataLakeFileClientWrapper clientWrapper
                 = new DataLakeFileClientWrapper(dataLakeFileSystemClient.getFileClient(fileName));
         final DataLakeFileOperations operations = new DataLakeFileOperations(getEndpoint().getConfiguration(), clientWrapper);
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         DataLakeOperationResponse response;
 
diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index 28d2428..38d057f 100644
--- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -27,6 +27,7 @@ import com.azure.storage.queue.models.QueueMessageItem;
 import com.azure.storage.queue.models.QueueStorageException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper;
 import org.apache.camel.component.azure.storage.queue.operations.QueueOperations;
@@ -79,7 +80,7 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer {
     private Queue<Exchange> createExchanges(final List<QueueMessageItem> messageItems) {
         return messageItems
                 .stream()
-                .map(queueMessageItem -> getEndpoint().createExchange(queueMessageItem))
+                .map(this::createExchange)
                 .collect(Collectors.toCollection(LinkedList::new));
     }
 
@@ -142,6 +143,16 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer {
         return total;
     }
 
+    private Exchange createExchange(final QueueMessageItem messageItem) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        message.setBody(messageItem.getMessageText());
+        message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap());
+
+        return exchange;
+    }
+
     /**
      * Strategy to delete the message after being processed.
      *
diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
index 525ae03..4b92bf1 100644
--- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
+++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
@@ -70,16 +70,6 @@ public class QueueEndpoint extends DefaultEndpoint {
                 ? configuration.getServiceClient() : QueueClientFactory.createQueueServiceClient(configuration);
     }
 
-    public Exchange createExchange(final QueueMessageItem messageItem) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        message.setBody(messageItem.getMessageText());
-        message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap());
-
-        return exchange;
-    }
-
     /**
      * The component configurations
      */
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
index f3fc7b7..74a3638 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
@@ -35,10 +35,10 @@ public class BlobServiceConsumer extends ScheduledPollConsumer {
 
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = super.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         try {
             getBlob(exchange);
-            super.getAsyncProcessor().process(exchange);
+            getProcessor().process(exchange);
             return 1;
         } catch (StorageException ex) {
             if (404 == ex.getHttpStatusCode()) {
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
index 5d665b8..4154b93 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
@@ -33,10 +33,10 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
 
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = super.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         try {
             retrieveMessage(exchange);
-            super.getAsyncProcessor().process(exchange);
+            getProcessor().process(exchange);
             return 1;
         } catch (StorageException ex) {
             if (404 == ex.getHttpStatusCode()) {
@@ -51,7 +51,6 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
         //TODO: Support the batch processing if needed, given that it is possible
         // to retrieve more than 1 message in one go, similarly to camel-aws/s3 consumer. 
         QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
-
     }
 
     protected QueueServiceConfiguration getConfiguration() {