You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/22 07:05:15 UTC

[camel] branch exchange-factory updated (01c91e2 -> 2c612eb)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 01c91e2  CAMEL-16222: PooledExchangeFactory experiment
     new a7f91ad  CAMEL-16222: PooledExchangeFactory experiment
     new 2c612eb  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../config_maps/KubernetesConfigMapsConsumer.java  |  4 +-
 .../KubernetesCustomResourcesConsumer.java         |  4 +-
 .../deployments/KubernetesDeploymentsConsumer.java |  4 +-
 .../kubernetes/hpa/KubernetesHPAConsumer.java      |  4 +-
 .../namespaces/KubernetesNamespacesConsumer.java   |  4 +-
 .../kubernetes/nodes/KubernetesNodesConsumer.java  |  4 +-
 .../kubernetes/pods/KubernetesPodsConsumer.java    |  4 +-
 .../KubernetesReplicationControllersConsumer.java  |  5 +-
 .../services/KubernetesServicesConsumer.java       |  5 +-
 .../apache/camel/component/mail/MailConsumer.java  | 11 ++-
 .../apache/camel/component/mail/MailEndpoint.java  |  7 --
 .../component/milo/client/MiloClientConsumer.java  |  5 +-
 .../component/milo/server/MiloServerConsumer.java  |  4 +-
 .../apache/camel/component/mina/MinaConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/mina/MinaEndpoint.java  | 11 ---
 .../mina/MinaTransferExchangeOptionTest.java       |  1 -
 .../camel/component/minio/MinioConsumer.java       | 36 +++++++-
 .../camel/component/minio/MinioEndpoint.java       | 40 +--------
 .../apache/camel/component/mllp/MllpEndpoint.java  |  2 +-
 .../component/mllp/MllpTcpServerConsumer.java      |  8 +-
 .../component/mongodb/gridfs/GridFsConsumer.java   |  2 +-
 .../mongodb/MongoDbChangeStreamsConsumer.java      |  4 +
 .../mongodb/MongoDbChangeStreamsThread.java        | 14 ++-
 .../camel/component/mongodb/MongoDbEndpoint.java   | 12 ---
 .../component/mongodb/MongoDbTailingThread.java    | 14 ++-
 .../camel/component/mybatis/MyBatisConsumer.java   | 21 +++--
 .../mybatis/MyBatisConsumerIsolatedTest.java       | 53 ------------
 27 files changed, 184 insertions(+), 198 deletions(-)
 delete mode 100644 components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2c612eb53bc40fff40fc71faa679236cf4304c07
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 22 08:04:44 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/mail/MailConsumer.java  | 11 ++-
 .../apache/camel/component/mail/MailEndpoint.java  |  7 --
 .../component/milo/client/MiloClientConsumer.java  |  5 +-
 .../component/milo/server/MiloServerConsumer.java  |  4 +-
 .../apache/camel/component/mina/MinaConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/mina/MinaEndpoint.java  | 11 ---
 .../mina/MinaTransferExchangeOptionTest.java       |  1 -
 .../camel/component/minio/MinioConsumer.java       | 36 +++++++-
 .../camel/component/minio/MinioEndpoint.java       | 40 +--------
 .../apache/camel/component/mllp/MllpEndpoint.java  |  2 +-
 .../component/mllp/MllpTcpServerConsumer.java      |  8 +-
 .../component/mongodb/gridfs/GridFsConsumer.java   |  2 +-
 .../mongodb/MongoDbChangeStreamsConsumer.java      |  4 +
 .../mongodb/MongoDbChangeStreamsThread.java        | 14 ++-
 .../camel/component/mongodb/MongoDbEndpoint.java   | 12 ---
 .../component/mongodb/MongoDbTailingThread.java    | 14 ++-
 .../camel/component/mybatis/MyBatisConsumer.java   | 21 +++--
 .../mybatis/MyBatisConsumerIsolatedTest.java       | 53 ------------
 18 files changed, 157 insertions(+), 187 deletions(-)

diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index dd541b2..6fe7825 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -385,7 +385,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
                 }
 
                 if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-                    Exchange exchange = getEndpoint().createExchange(message);
+                    Exchange exchange = createExchange(message);
                     if (getEndpoint().getConfiguration().isMapMailMessage()) {
                         // ensure the mail message is mapped, which can be ensured by touching the body/header/attachment
                         LOG.trace("Mapping #{} from javax.mail.Message to Camel MailMessage", i);
@@ -399,6 +399,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
                                 exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att);
                             }
                         } catch (MessagingException | IOException e) {
+                            // must release exchange before throwing exception
+                            releaseExchange(exchange, true);
                             throw new RuntimeCamelException("Error accessing attachments due to: " + e.getMessage(), e);
                         }
                     }
@@ -511,6 +513,13 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
+    private Exchange createExchange(Message message) {
+        Exchange exchange = createExchange(true);
+        exchange.setProperty(Exchange.BINDING, getEndpoint().getBinding());
+        exchange.setIn(new MailMessage(exchange, message, getEndpoint().getConfiguration().isMapMailMessage()));
+        return exchange;
+    }
+
     private void copyOrMoveMessageIfRequired(
             MailConfiguration config, Message mail, String destinationFolder, boolean moveMessage)
             throws MessagingException {
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
index c7723cd..1ce1be9 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
@@ -126,13 +126,6 @@ public class MailEndpoint extends ScheduledPollEndpoint implements HeaderFilterS
         return answer;
     }
 
-    public Exchange createExchange(Message message) {
-        Exchange exchange = super.createExchange();
-        exchange.setProperty(Exchange.BINDING, getBinding());
-        exchange.setIn(new MailMessage(exchange, message, getConfiguration().isMapMailMessage()));
-        return exchange;
-    }
-
     // Properties
     // -------------------------------------------------------------------------
 
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
index 408af8c..6068910 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
@@ -69,10 +69,9 @@ public class MiloClientConsumer extends DefaultConsumer {
     private void handleValueUpdate(final DataValue value) {
         LOG.debug("Handle item update - {} = {}", node, value);
 
-        final Exchange exchange = getEndpoint().createExchange();
-        mapToMessage(value, exchange.getMessage());
-
+        final Exchange exchange = createExchange(true);
         try {
+            mapToMessage(value, exchange.getMessage());
             getProcessor().process(exchange);
         } catch (final Exception e) {
             getExceptionHandler().handleException("Error processing exchange", e);
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
index f958974..7677e17 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
@@ -54,10 +54,10 @@ public class MiloServerConsumer extends DefaultConsumer {
     }
 
     protected void performWrite(final DataValue value) {
-        Exchange exchange = getEndpoint().createExchange();
-        mapToMessage(value, exchange.getMessage());
+        Exchange exchange = createExchange(true);
 
         try {
+            mapToMessage(value, exchange.getMessage());
             getProcessor().process(exchange);
         } catch (Exception e) {
             getExceptionHandler().handleException("Error processing exchange", e);
diff --git a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
index 1152f8f..76917e3 100644
--- a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
+++ b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
@@ -373,6 +373,21 @@ public class MinaConsumer extends DefaultConsumer {
         this.acceptor = acceptor;
     }
 
+    private Exchange createExchange(IoSession session, Object payload) {
+        Exchange exchange;
+        if (configuration.isTransferExchange()) {
+            // do not release
+            exchange = getEndpoint().createExchange();
+        } else {
+            exchange = createExchange(false);
+        }
+        exchange.getIn().setHeader(MinaConstants.MINA_IOSESSION, session);
+        exchange.getIn().setHeader(MinaConstants.MINA_LOCAL_ADDRESS, session.getLocalAddress());
+        exchange.getIn().setHeader(MinaConstants.MINA_REMOTE_ADDRESS, session.getRemoteAddress());
+        MinaPayloadHelper.setIn(exchange, payload);
+        return exchange;
+    }
+
     /**
      * Handles consuming messages and replying if the exchange is out capable.
      */
@@ -406,7 +421,7 @@ public class MinaConsumer extends DefaultConsumer {
                 LOG.debug("Received body: {}", in);
             }
 
-            Exchange exchange = getEndpoint().createExchange(session, object);
+            Exchange exchange = createExchange(session, object);
             //Set the exchange charset property for converting
             if (getEndpoint().getConfiguration().getCharsetName() != null) {
                 exchange.setProperty(Exchange.CHARSET_NAME,
@@ -419,50 +434,54 @@ public class MinaConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException(e);
             }
 
-            //
-            // If there's a response to send, send it.
-            //
-            boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
-            Object response = null;
-            if (exchange.hasOut()) {
-                response = MinaPayloadHelper.getOut(getEndpoint(), exchange);
-            } else {
-                response = MinaPayloadHelper.getIn(getEndpoint(), exchange);
-            }
-
-            boolean failed = exchange.isFailed();
-            if (failed && !getEndpoint().getConfiguration().isTransferExchange()) {
-                if (exchange.getException() != null) {
-                    response = exchange.getException();
+            try {
+                //
+                // If there's a response to send, send it.
+                //
+                boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
+                Object response;
+                if (exchange.hasOut()) {
+                    response = MinaPayloadHelper.getOut(getEndpoint(), exchange);
                 } else {
-                    // failed and no exception, must be a fault
-                    response = exchange.getOut().getBody();
+                    response = MinaPayloadHelper.getIn(getEndpoint(), exchange);
                 }
-            }
 
-            if (response != null) {
-                LOG.debug("Writing body: {}", response);
-                MinaHelper.writeBody(session, response, exchange, configuration.getWriteTimeout());
-            } else {
-                LOG.debug("Writing no response");
-                disconnect = Boolean.TRUE;
-            }
+                boolean failed = exchange.isFailed();
+                if (failed && !getEndpoint().getConfiguration().isTransferExchange()) {
+                    if (exchange.getException() != null) {
+                        response = exchange.getException();
+                    } else {
+                        // failed and no exception, must be a fault
+                        response = exchange.getOut().getBody();
+                    }
+                }
 
-            // should session be closed after complete?
-            Boolean close;
-            if (ExchangeHelper.isOutCapable(exchange)) {
-                close = exchange.getOut().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
-            } else {
-                close = exchange.getIn().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
-            }
+                if (response != null) {
+                    LOG.debug("Writing body: {}", response);
+                    MinaHelper.writeBody(session, response, exchange, configuration.getWriteTimeout());
+                } else {
+                    LOG.debug("Writing no response");
+                    disconnect = Boolean.TRUE;
+                }
 
-            // should we disconnect, the header can override the configuration
-            if (close != null) {
-                disconnect = close;
-            }
-            if (disconnect) {
-                LOG.debug("Closing session when complete at address: {}", address);
-                session.closeNow();
+                // should session be closed after complete?
+                Boolean close;
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    close = exchange.getOut().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+                } else {
+                    close = exchange.getIn().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+                }
+
+                // should we disconnect, the header can override the configuration
+                if (close != null) {
+                    disconnect = close;
+                }
+                if (disconnect) {
+                    LOG.debug("Closing session when complete at address: {}", address);
+                    session.closeNow();
+                }
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
index 2d3784a..416579a 100644
--- a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
+++ b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.mina;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -27,7 +26,6 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.mina.core.session.IoSession;
 
 /**
  * Socket level networking using TCP or UDP with Apache Mina 2.x.
@@ -68,15 +66,6 @@ public class MinaEndpoint extends DefaultEndpoint implements MultipleConsumersSu
         return answer;
     }
 
-    public Exchange createExchange(IoSession session, Object payload) {
-        Exchange exchange = createExchange();
-        exchange.getIn().setHeader(MinaConstants.MINA_IOSESSION, session);
-        exchange.getIn().setHeader(MinaConstants.MINA_LOCAL_ADDRESS, session.getLocalAddress());
-        exchange.getIn().setHeader(MinaConstants.MINA_REMOTE_ADDRESS, session.getRemoteAddress());
-        MinaPayloadHelper.setIn(exchange, payload);
-        return exchange;
-    }
-
     @Override
     public boolean isMultipleConsumersSupported() {
         // only datagram should allow multiple consumers
diff --git a/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java b/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
index c0354ae..140617b 100644
--- a/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
+++ b/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
@@ -58,7 +58,6 @@ public class MinaTransferExchangeOptionTest extends BaseMinaTest {
                 String.format("mina:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true", getPort()));
         Producer producer = endpoint.createProducer();
         Exchange exchange = endpoint.createExchange();
-        //Exchange exchange = endpoint.createExchange();
 
         Message message = exchange.getIn();
         message.setBody("Hello!");
diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index df0a7d9..e98b295 100644
--- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -39,9 +39,11 @@ import io.minio.errors.MinioException;
 import io.minio.messages.Item;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.URISupport;
@@ -182,7 +184,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
 
     protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
         Queue<Exchange> answer = new LinkedList<>();
-        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        Exchange exchange = createExchange(objectStream, objectName);
         answer.add(exchange);
         IOHelper.close(objectStream);
         return answer;
@@ -200,7 +202,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
                     Item minioObjectSummary = minioObjectSummaries.next().get();
                     InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
                     minioObjects.add(minioObject);
-                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
                     answer.add(exchange);
                     continuationToken = minioObjectSummary.objectName();
                 } while (minioObjectSummaries.hasNext());
@@ -212,7 +214,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
                     if (!minioObjectSummary.isDir()) {
                         InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
                         minioObjects.add(minioObject);
-                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
                         answer.add(exchange);
                         continuationToken = minioObjectSummary.objectName();
                     }
@@ -401,6 +403,34 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
         return (MinioEndpoint) super.getEndpoint();
     }
 
+    private Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getEndpoint().getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            message.setBody(getEndpoint().readInputStream(minioObject));
+            if (getConfiguration().isAutoCloseBody()) {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                    @Override
+                    public void onDone(Exchange exchange) {
+                        IOHelper.close(minioObject);
+                    }
+                });
+            }
+        } else {
+            message.setBody(null);
+            IOHelper.close(minioObject);
+        }
+
+        return exchange;
+    }
+
     @Override
     public String toString() {
         if (isEmpty(minioConsumerToString)) {
diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
index e85cf8f..bb67186 100644
--- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
+++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
@@ -32,17 +32,12 @@ import io.minio.StatObjectResponse;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,37 +118,6 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         super.doStop();
     }
 
-    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
-        return createExchange(getExchangePattern(), minioObject, objectName);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern, InputStream minioObject, String objectName) throws Exception {
-        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
-
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-        LOG.trace("Got object!");
-
-        getObjectStat(objectName, message);
-
-        if (getConfiguration().isIncludeBody()) {
-            message.setBody(readInputStream(minioObject));
-            if (getConfiguration().isAutoCloseBody()) {
-                exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
-                    @Override
-                    public void onDone(Exchange exchange) {
-                        IOHelper.close(minioObject);
-                    }
-                });
-            }
-        } else {
-            message.setBody(null);
-            IOHelper.close(minioObject);
-        }
-
-        return exchange;
-    }
-
     public MinioConfiguration getConfiguration() {
         return configuration;
     }
@@ -196,7 +160,7 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         }
     }
 
-    private String readInputStream(InputStream minioObject) throws IOException {
+    String readInputStream(InputStream minioObject) throws IOException {
         StringBuilder textBuilder = new StringBuilder();
         try (Reader reader = new BufferedReader(new InputStreamReader(minioObject, StandardCharsets.UTF_8))) {
             int c;
@@ -227,7 +191,7 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         LOG.trace("Bucket policy updated");
     }
 
-    private void getObjectStat(String objectName, Message message) throws Exception {
+    void getObjectStat(String objectName, Message message) throws Exception {
 
         String bucketName = getConfiguration().getBucketName();
         StatObjectArgs.Builder statObjectRequest = StatObjectArgs.builder().bucket(bucketName).object(objectName);
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index d1e3a4a..35719f0 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -109,7 +109,7 @@ public class MllpEndpoint extends DefaultEndpoint {
         super.setBridgeErrorHandler(configuration.isBridgeErrorHandler());
     }
 
-    private void setExchangeProperties(Exchange mllpExchange) {
+    void setExchangeProperties(Exchange mllpExchange) {
         if (configuration.hasCharsetName()) {
             mllpExchange.setProperty(Exchange.CHARSET_NAME, configuration.getCharsetName());
         }
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index d43c356..2ca75bb 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -228,7 +228,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         // Send the message on to Camel for processing and wait for the response
         log.debug("processMessage(hl7MessageBytes[{}], {}) - populating the exchange with received payload",
                 hl7MessageBytes == null ? -1 : hl7MessageBytes.length, consumerRunnable.getSocket());
-        Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
+        Exchange exchange = createExchange(false);
+        exchange.setPattern(ExchangePattern.InOut);
         if (getConfiguration().hasCharsetName()) {
             exchange.setProperty(Exchange.CHARSET_NAME, getConfiguration().getCharsetName());
         }
@@ -283,9 +284,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                     "processMessage(byte[], TcpSocketConsumerRunnable) - Unexpected exception creating Unit of Work", exchange,
                     uowEx);
         } finally {
-            if (exchange != null) {
-                doneUoW(exchange);
-            }
+            doneUoW(exchange);
+            releaseExchange(exchange, false);
         }
     }
 
diff --git a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
index 60eff1d..74fe66a 100644
--- a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
+++ b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
@@ -142,7 +142,7 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable {
                         forig = endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
                     }
                     if (forig != null) {
-                        Exchange exchange = endpoint.createExchange();
+                        Exchange exchange = createExchange(true);
                         GridFSDownloadStream downloadStream = endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
                         file = downloadStream.getGridFSFile();
 
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
index f625205..4fd3b23 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
@@ -19,10 +19,13 @@ package org.apache.camel.component.mongodb;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.bson.BsonDocument;
+import org.bson.Document;
 
 import static java.util.Collections.singletonList;
 
@@ -69,4 +72,5 @@ public class MongoDbChangeStreamsConsumer extends DefaultConsumer {
         changeStreamsThread.init();
         executor.execute(changeStreamsThread);
     }
+
 }
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 116b072..8757793 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -24,6 +24,7 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
 import com.mongodb.client.model.changestream.OperationType;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.bson.BsonDocument;
 import org.bson.Document;
 import org.bson.types.ObjectId;
@@ -70,7 +71,7 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
         try {
             while (cursor.hasNext() && keepRunning) {
                 ChangeStreamDocument<Document> dbObj = (ChangeStreamDocument<Document>) cursor.next();
-                Exchange exchange = endpoint.createMongoDbExchange(dbObj.getFullDocument());
+                Exchange exchange = createMongoDbExchange(dbObj.getFullDocument());
 
                 ObjectId documentId = dbObj.getDocumentKey().getObjectId(MONGO_ID).getValue();
                 OperationType operationType = dbObj.getOperationType();
@@ -101,4 +102,15 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
             }
         }
     }
+
+    private Exchange createMongoDbExchange(Document dbObj) {
+        Exchange exchange = consumer.createExchange(true);
+        Message message = exchange.getIn();
+        message.setHeader(MongoDbConstants.DATABASE, endpoint.getDatabase());
+        message.setHeader(MongoDbConstants.COLLECTION, endpoint.getCollection());
+        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+        message.setBody(dbObj);
+        return exchange;
+    }
+
 }
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 76ddc9a..a118fea 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -31,8 +31,6 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -318,16 +316,6 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         }
     }
 
-    public Exchange createMongoDbExchange(Document dbObj) {
-        Exchange exchange = super.createExchange();
-        Message message = exchange.getIn();
-        message.setHeader(MongoDbConstants.DATABASE, database);
-        message.setHeader(MongoDbConstants.COLLECTION, collection);
-        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
-        message.setBody(dbObj);
-        return exchange;
-    }
-
     @Override
     protected void doStart() throws Exception {
         if (mongoConnection == null) {
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
index 5f870d1..b2c7747 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
@@ -20,6 +20,7 @@ import com.mongodb.CursorType;
 import com.mongodb.MongoCursorNotFoundException;
 import com.mongodb.client.MongoCursor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.bson.Document;
 
 import static com.mongodb.client.model.Filters.gt;
@@ -110,7 +111,7 @@ class MongoDbTailingThread extends MongoAbstractConsumerThread {
         try {
             while (cursor.hasNext() && keepRunning) {
                 Document dbObj = (Document) cursor.next();
-                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                Exchange exchange = createMongoDbExchange(dbObj);
                 try {
                     if (log.isTraceEnabled()) {
                         log.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
@@ -145,4 +146,15 @@ class MongoDbTailingThread extends MongoAbstractConsumerThread {
             tailTracking.persistToStore();
         }
     }
+
+    Exchange createMongoDbExchange(Document dbObj) {
+        Exchange exchange = consumer.createExchange(true);
+        Message message = exchange.getIn();
+        message.setHeader(MongoDbConstants.DATABASE, endpoint.getDatabase());
+        message.setHeader(MongoDbConstants.COLLECTION, endpoint.getCollection());
+        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+        message.setBody(dbObj);
+        return exchange;
+    }
+
 }
diff --git a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
index 45bfca1..bfc721d 100644
--- a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
+++ b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
@@ -130,9 +130,10 @@ public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
 
             // process the current exchange
             LOG.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
-            getProcessor().process(exchange);
-
+            Exception cause = null;
             try {
+                getProcessor().process(exchange);
+
                 if (onConsume != null) {
                     endpoint.getProcessingStrategy().commit(endpoint, exchange, data, onConsume);
                 }
@@ -142,13 +143,16 @@ public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
 
             if (getEndpoint().isTransacted() && exchange.isFailed()) {
                 // break out as we are transacted and should rollback
-                Exception cause = exchange.getException();
-                if (cause != null) {
-                    throw cause;
-                } else {
-                    throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange);
+                cause = exchange.getException();
+                if (cause == null) {
+                    cause = new RollbackExchangeException("Rollback transaction due error processing exchange", null);
                 }
             }
+            releaseExchange(exchange, false);
+
+            if (cause != null) {
+                throw cause;
+            }
         }
 
         return total;
@@ -156,7 +160,8 @@ public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
 
     private Exchange createExchange(Object data) {
         final MyBatisEndpoint endpoint = getEndpoint();
-        final Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
+        final Exchange exchange = createExchange(false);
+        exchange.setPattern(ExchangePattern.InOnly);
         final String outputHeader = getEndpoint().getOutputHeader();
 
         Message msg = exchange.getIn();
diff --git a/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java b/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java
deleted file mode 100644
index 27ad1db..0000000
--- a/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mybatis;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Processor;
-import org.apache.camel.support.DefaultExchange;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-
-public class MyBatisConsumerIsolatedTest {
-
-    @Test
-    public void shouldRespectBatchSize() throws Exception {
-        // Given
-        int batchSize = 5;
-        MyBatisConsumer consumer = new MyBatisConsumer(mock(MyBatisEndpoint.class), mock(Processor.class));
-        consumer.setMaxMessagesPerPoll(batchSize);
-
-        Queue<Object> emptyMessageQueue = new ArrayDeque<>();
-        for (int i = 0; i < 10; i++) {
-            MyBatisConsumer.DataHolder dataHolder = new MyBatisConsumer.DataHolder();
-            dataHolder.exchange = new DefaultExchange(mock(CamelContext.class));
-            emptyMessageQueue.add(dataHolder);
-        }
-
-        // When
-        int processedMessages = consumer.processBatch(emptyMessageQueue);
-
-        // Then
-        assertEquals(batchSize, processedMessages);
-    }
-
-}


[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a7f91ad0e171410e723ad3b88e1465aa9ef612bd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 22 07:30:03 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../kubernetes/config_maps/KubernetesConfigMapsConsumer.java         | 4 +++-
 .../customresources/KubernetesCustomResourcesConsumer.java           | 4 +++-
 .../kubernetes/deployments/KubernetesDeploymentsConsumer.java        | 4 +++-
 .../apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java | 4 +++-
 .../kubernetes/namespaces/KubernetesNamespacesConsumer.java          | 4 +++-
 .../camel/component/kubernetes/nodes/KubernetesNodesConsumer.java    | 4 +++-
 .../camel/component/kubernetes/pods/KubernetesPodsConsumer.java      | 4 +++-
 .../KubernetesReplicationControllersConsumer.java                    | 5 +++--
 .../component/kubernetes/services/KubernetesServicesConsumer.java    | 5 +++--
 9 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
index b33b737..5df45d0 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
@@ -110,7 +110,7 @@ public class KubernetesConfigMapsConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, ConfigMap resource) {
                     ConfigMapEvent de = new ConfigMapEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(de.getConfigMap());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, de.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -118,6 +118,8 @@ public class KubernetesConfigMapsConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
index 8ce8010..0e22f9b 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
@@ -97,7 +97,7 @@ public class KubernetesCustomResourcesConsumer extends DefaultConsumer {
 
                     @Override
                     public void eventReceived(Action action, String resource) {
-                        Exchange exchange = getEndpoint().createExchange();
+                        Exchange exchange = createExchange(false);
                         exchange.getIn().setBody(resource);
                         exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_CRD_EVENT_ACTION, action);
                         exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_CRD_EVENT_TIMESTAMP,
@@ -106,6 +106,8 @@ public class KubernetesCustomResourcesConsumer extends DefaultConsumer {
                             processor.process(exchange);
                         } catch (Exception e) {
                             getExceptionHandler().handleException("Error during processing", exchange, e);
+                        } finally {
+                            releaseExchange(exchange, false);
                         }
                     }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
index 58bbc45..5eed4b8 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
@@ -105,7 +105,7 @@ public class KubernetesDeploymentsConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Deployment resource) {
                     DeploymentEvent de = new DeploymentEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(de.getDeployment());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, de.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -113,6 +113,8 @@ public class KubernetesDeploymentsConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
index 99d13e5..abf7d09 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
@@ -109,7 +109,7 @@ public class KubernetesHPAConsumer extends DefaultConsumer {
                 public void eventReceived(
                         io.fabric8.kubernetes.client.Watcher.Action action, HorizontalPodAutoscaler resource) {
                     HPAEvent hpae = new HPAEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(hpae.getHpa());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, hpae.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -117,6 +117,8 @@ public class KubernetesHPAConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
index 862fe83..aa026f7 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
@@ -99,7 +99,7 @@ public class KubernetesNamespacesConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Namespace resource) {
                     NamespaceEvent ne = new NamespaceEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(ne.getNamespace());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -107,6 +107,8 @@ public class KubernetesNamespacesConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
index 0a4f745..651eeee 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
@@ -103,7 +103,7 @@ public class KubernetesNodesConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Node resource) {
                     NodeEvent ne = new NodeEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(ne.getNode());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -111,6 +111,8 @@ public class KubernetesNodesConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
index d336aef..5f7b071 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
@@ -106,7 +106,7 @@ public class KubernetesPodsConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) {
                     PodEvent pe = new PodEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(pe.getPod());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -114,6 +114,8 @@ public class KubernetesPodsConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
                 }
 
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
index a2a0d8d..8aec7d8 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
@@ -108,7 +108,7 @@ public class KubernetesReplicationControllersConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, ReplicationController resource) {
                     ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(rce.getReplicationController());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -116,8 +116,9 @@ public class KubernetesReplicationControllersConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
-
                 }
 
                 @Override
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
index 98b3cb0..3b78fe8 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
@@ -106,7 +106,7 @@ public class KubernetesServicesConsumer extends DefaultConsumer {
                 @Override
                 public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) {
                     ServiceEvent se = new ServiceEvent(action, resource);
-                    Exchange exchange = getEndpoint().createExchange();
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(se.getService());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
                     exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
@@ -114,8 +114,9 @@ public class KubernetesServicesConsumer extends DefaultConsumer {
                         processor.process(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error during processing", exchange, e);
+                    } finally {
+                        releaseExchange(exchange, false);
                     }
-
                 }
 
                 @Override