You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/19 15:32:12 UTC

[camel] branch exchange-factory updated (a842a54 -> 230dbe7)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from a842a54  CAMEL-16222: PooledExchangeFactory experiment
     new 497e4c5  CAMEL-16222: PooledExchangeFactory experiment
     new 230dbe7  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/camel/ExtendedExchange.java    | 20 +++++++++++++-
 .../src/main/java/org/apache/camel/Message.java    |  7 +++++
 .../camel/impl/engine/DefaultUnitOfWork.java       |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java   |  7 +++--
 .../org/apache/camel/support/DefaultConsumer.java  |  6 ++--
 .../org/apache/camel/support/DefaultExchange.java  | 32 +++++++++++++++++-----
 .../org/apache/camel/support/DefaultMessage.java   |  8 ++++++
 .../org/apache/camel/support/MessageSupport.java   |  7 +++++
 .../ROOT/pages/camel-3x-upgrade-guide-3_9.adoc     | 17 ++++++++++++
 9 files changed, 91 insertions(+), 15 deletions(-)


[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 497e4c53ae4f0144ff60eae8532f95c3f4593bbf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 19 16:09:29 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../main/java/org/apache/camel/ExtendedExchange.java | 20 +++++++++++++++++++-
 .../apache/camel/impl/engine/DefaultUnitOfWork.java  |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java     |  7 +++++--
 .../org/apache/camel/support/DefaultConsumer.java    |  6 ++----
 .../org/apache/camel/support/DefaultExchange.java    | 19 ++++++++++++++-----
 .../ROOT/pages/camel-3x-upgrade-guide-3_9.adoc       | 17 +++++++++++++++++
 6 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 36fb8b9..dec68a9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -31,6 +31,8 @@ public interface ExtendedExchange extends Exchange {
 
     /**
      * Registers a task to run when this exchange is done.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
      */
     void onDone(Function<Exchange, Boolean> task);
 
@@ -39,14 +41,30 @@ public interface ExtendedExchange extends Exchange {
      * <p/>
      * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
      */
-    void done();
+    void done(boolean forced);
 
     /**
      * Resets the exchange for reuse with the given created timestamp;
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
      */
     void reset(long created);
 
     /**
+     * Whether this exchange was created to auto release when its unit of work is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void setAutoRelease(boolean autoRelease);
+
+    /**
+     * Whether this exchange was created to auto release when its unit of work is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    boolean isAutoRelease();
+
+    /**
      * Sets the endpoint which originated this message exchange. This method should typically only be called by
      * {@link Endpoint} implementations
      */
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index d501526..372072a 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -249,7 +249,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
 
         // the exchange is now done
         try {
-            exchange.adapt(ExtendedExchange.class).done();
+            exchange.adapt(ExtendedExchange.class).done(false);
         } catch (Throwable e) {
             // must catch exceptions to ensure synchronizations is also invoked
             log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index e4fe5e0..83e619b 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -100,6 +100,7 @@ public class PooledExchangeFactory extends ServiceSupport
             }
             // create a new exchange as there was no free from the pool
             ExtendedExchange answer = new DefaultExchange(camelContext);
+            answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
                 answer.onDone(this::release);
@@ -125,6 +126,7 @@ public class PooledExchangeFactory extends ServiceSupport
             }
             // create a new exchange as there was no free from the pool
             ExtendedExchange answer = new DefaultExchange(fromEndpoint);
+            answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
                 answer.onDone(this::release);
@@ -143,10 +145,11 @@ public class PooledExchangeFactory extends ServiceSupport
 
     @Override
     public boolean release(Exchange exchange) {
-        // reset exchange before returning to pool
         try {
+            // done exchange before returning back to pool
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.done();
+            boolean force = !ee.isAutoRelease();
+            ee.done(force);
             ee.onDone(null);
 
             // only release back in pool if reset was success
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 378f015..2c00dc1 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -136,10 +136,8 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     public void releaseExchange(Exchange exchange, boolean autoRelease) {
         if (exchange != null) {
             if (!autoRelease) {
-                // we must manually done the exchange
-                // TODO: hack
-                exchange.adapt(ExtendedExchange.class).onDone(e -> true);
-                exchange.adapt(ExtendedExchange.class).done();
+                // if not auto release we must manually force done
+                exchange.adapt(ExtendedExchange.class).done(true);
             }
             exchangeFactory.release(exchange);
         }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index ae08318..4ddeb65 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -72,6 +72,7 @@ public final class DefaultExchange implements ExtendedExchange {
     private boolean interruptable = true;
     private boolean redeliveryExhausted;
     private Boolean errorHandlerHandled;
+    private boolean autoRelease;
 
     public DefaultExchange(CamelContext context) {
         this.context = context;
@@ -123,15 +124,21 @@ public final class DefaultExchange implements ExtendedExchange {
         }
     }
 
+    public boolean isAutoRelease() {
+        return autoRelease;
+    }
+
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
+    }
+
     @Override
     public void onDone(Function<Exchange, Boolean> task) {
         this.onDone = task;
     }
 
-    public void done() {
-        // only need to do this if there is an onDone task
-        // and use created flag to avoid doing done more than once
-        if (created > 0 && onDone != null) {
+    public void done(boolean forced) {
+        if (created > 0 && (forced || autoRelease)) {
             this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
             this.properties.clear();
             this.exchangeId = null;
@@ -162,7 +169,9 @@ public final class DefaultExchange implements ExtendedExchange {
             this.redeliveryExhausted = false;
             this.errorHandlerHandled = null;
 
-            onDone.apply(this);
+            if (onDone != null) {
+                onDone.apply(this);
+            }
         }
     }
 
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
index e78e559..3bcfd00 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
@@ -6,6 +6,23 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
 
 == Upgrading Camel 3.8 to 3.9
 
+=== API changes
+
+The `Consumer` API in `camel-api` has been enhanced to help support Camel reducing the footprint during routing.
+One aspect is that we allow to recycle `Exchange` instances created by the consumers. This avoids creating new `Exchange`
+instances in the memory for each incoming message consumers process. By recycling `Exchange`s we reduce the overhead
+on the JVM garbage collector. However this requires Camel to know whether or not the `Exchange` should be recycle or not,
+and some API changes took place.
+
+The `Consumer` API has two new methods which a consumer must use to create an `Exchange` with `createExchange`.
+By default the exchange is auto released when its complete in use, but some consumers needs custom control,
+and can turn off auto release, which then requires the consumer to manually release the exchange by calling `releaseExchange`
+when the consumer is done with the exchange.
+
+The default implementations in `DefaultConsumer` has adapted this API and 3rd party components can continue as is, by using
+the older APIs. However for these 3rd party components to support recycling exchanges, then they must be updated to use this new API.
+
+
 === Modularized camel-spring
 
 The `camel-spring` component has been modularized into:


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 230dbe724d9dd45f6bfce5ee04264268d6fa80fb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 19 16:31:28 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 core/camel-api/src/main/java/org/apache/camel/Message.java  |  7 +++++++
 .../main/java/org/apache/camel/support/DefaultExchange.java | 13 +++++++++++--
 .../main/java/org/apache/camel/support/DefaultMessage.java  |  8 ++++++++
 .../main/java/org/apache/camel/support/MessageSupport.java  |  7 +++++++
 4 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Message.java b/core/camel-api/src/main/java/org/apache/camel/Message.java
index 3f14c01..df8dcbc 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Message.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Message.java
@@ -33,6 +33,13 @@ import org.apache.camel.spi.HeadersMapFactory;
 public interface Message {
 
     /**
+     * Clears the message from user data so it may be reused.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
+     */
+    void reset();
+
+    /**
      * Returns the id of the message.
      * <p/>
      * By default the message uses the same id as {@link Exchange#getExchangeId()} as messages are associated with the
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 4ddeb65..371b991 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -50,6 +50,7 @@ public final class DefaultExchange implements ExtendedExchange {
     private long created;
     // optimize to create properties always and with a reasonable small size
     private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
+    private Class originalInClassType;
     private Message in;
     private Message out;
     private Exception exception;
@@ -142,8 +143,12 @@ public final class DefaultExchange implements ExtendedExchange {
             this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
             this.properties.clear();
             this.exchangeId = null;
-            // TODO: optimize in/out to keep as default message (if original message is this kind)
-            this.in = null;
+            if (in != null && in.getClass() == originalInClassType) {
+                // okay we can reuse in
+                in.reset();
+            } else {
+                this.in = null;
+            }
             this.out = null;
             this.exception = null;
             // reset uow
@@ -396,6 +401,7 @@ public final class DefaultExchange implements ExtendedExchange {
     public Message getIn() {
         if (in == null) {
             in = new DefaultMessage(getContext());
+            originalInClassType = in.getClass();
             configureMessage(in);
         }
         return in;
@@ -419,6 +425,9 @@ public final class DefaultExchange implements ExtendedExchange {
     public void setIn(Message in) {
         this.in = in;
         configureMessage(in);
+        if (in != null) {
+            this.originalInClassType = in.getClass();
+        }
     }
 
     @Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
index 7e30d6f..f89ac01 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
@@ -52,6 +52,14 @@ public class DefaultMessage extends MessageSupport {
     }
 
     @Override
+    public void reset() {
+        super.reset();
+        if (headers != null) {
+            headers.clear();
+        }
+    }
+
+    @Override
     public Object getHeader(String name) {
         if (headers == null) {
             // force creating headers
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
index 6c1a2b1..48fcd71 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
@@ -42,6 +42,13 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data
     private DataType dataType;
 
     @Override
+    public void reset() {
+        body = null;
+        messageId = null;
+        dataType = null;
+    }
+
+    @Override
     public String toString() {
         // do not output information about the message as it may contain sensitive information
         if (messageId != null) {