You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/19 13:01:53 UTC

[camel] branch exchange-factory updated (b18bbcf -> 7db81b5)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from b18bbcf  CAMEL-16222: PooledExchangeFactory experiment
     new e9f5a12  CAMEL-16222: PooledExchangeFactory experiment
     new 65bf69b  camel-http - Small optimization
     new 7db81b5  CAMEL-16222: PooledExchangeFactory experiment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/http/HttpProducer.java  | 15 +---
 .../java/org/apache/camel/ExtendedExchange.java    | 14 +++-
 .../java/org/apache/camel/spi/ExchangeFactory.java | 10 ++-
 .../main/java/org/apache/camel/spi/UnitOfWork.java | 11 ++-
 .../camel/impl/engine/CamelInternalProcessor.java  | 20 +++--
 .../camel/impl/engine/DefaultUnitOfWork.java       | 59 ++++++++------
 .../apache/camel/impl/engine/MDCUnitOfWork.java    | 13 ++-
 .../camel/impl/engine/PooledExchangeFactory.java   | 49 +++++++-----
 .../camel/impl/engine/SimpleCamelContext.java      |  2 +-
 .../org/apache/camel/support/DefaultConsumer.java  |  1 -
 .../org/apache/camel/support/DefaultExchange.java  | 92 +++++++++++++---------
 .../org/apache/camel/support/UnitOfWorkHelper.java | 12 +--
 12 files changed, 170 insertions(+), 128 deletions(-)


[camel] 03/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7db81b5e9914338b16d6a696161bf39f8e3d3527
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 19 13:59:17 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/ExtendedExchange.java    |  14 ++-
 .../java/org/apache/camel/spi/ExchangeFactory.java |  10 +-
 .../main/java/org/apache/camel/spi/UnitOfWork.java |   8 +-
 .../camel/impl/engine/CamelInternalProcessor.java  |  10 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       | 104 +++++++++++----------
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |   7 --
 .../camel/impl/engine/PooledExchangeFactory.java   |  49 ++++++----
 .../camel/impl/engine/SimpleCamelContext.java      |   2 +-
 .../org/apache/camel/support/DefaultConsumer.java  |   1 -
 .../org/apache/camel/support/DefaultExchange.java  |  94 +++++++++++--------
 .../org/apache/camel/support/UnitOfWorkHelper.java |  12 +--
 11 files changed, 168 insertions(+), 143 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 9232ceb..36fb8b9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -18,6 +18,7 @@ package org.apache.camel;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
@@ -29,16 +30,21 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
-     * Clears the exchange from user data so it may be reused.
+     * Registers a task to run when this exchange is done.
+     */
+    void onDone(Function<Exchange, Boolean> task);
+
+    /**
+     * When the exchange is done being used.
      * <p/>
      * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
      */
-    void reset();
+    void done();
 
     /**
-     * Sets the created timestamp
+     * Resets the exchange for reuse with the given created timestamp;
      */
-    void setCreated(long created);
+    void reset(long created);
 
     /**
      * Sets the endpoint which originated this message exchange. This method should typically only be called by
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 592f431..34ef0cf 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -67,7 +67,13 @@ public interface ExchangeFactory {
      */
     Exchange create(Endpoint fromEndpoint, boolean autoRelease);
 
-    default void release(Exchange exchange) {
-        // noop
+    /**
+     * Releases the exchange back into the pool
+     *
+     * @param  exchange the exchange
+     * @return          true if released into the pool, or false if something went wrong and the exchange was discarded
+     */
+    default boolean release(Exchange exchange) {
+        return true;
     }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index e94840a..a6f118e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -23,13 +23,12 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.Service;
 
 /**
  * An object representing the unit of work processing an {@link Exchange} which allows the use of
  * {@link Synchronization} hooks. This object might map one-to-one with a transaction in JPA or Spring; or might not.
  */
-public interface UnitOfWork extends Service {
+public interface UnitOfWork {
 
     String MDC_BREADCRUMB_ID = "camel.breadcrumbId";
     String MDC_EXCHANGE_ID = "camel.exchangeId";
@@ -50,9 +49,10 @@ public interface UnitOfWork extends Service {
     /**
      * Prepares this unit of work with the given input {@link Exchange}
      *
-     * @param exchange the exchange
+     * @param  exchange the exchange
+     * @return          true if the unit of work was created and prepared, false if already prepared
      */
-    void onExchange(Exchange exchange);
+    boolean onPrepare(Exchange exchange);
 
     /**
      * Adds a synchronization hook
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 255b894..9696d1b 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -658,11 +658,17 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
                 created = createUnitOfWork(exchange);
                 ExtendedExchange ee = (ExtendedExchange) exchange;
                 ee.setUnitOfWork(created);
-                created.start();
                 uow = created;
             } else {
                 // reuse existing exchange
-                uow.onExchange(exchange);
+                if (uow.onPrepare(exchange)) {
+                    // need to re-attach uow
+                    ExtendedExchange ee = (ExtendedExchange) exchange;
+                    ee.setUnitOfWork(uow);
+                    // we are prepared for reuse and can regard it as-if we created the unit of work
+                    // so the after method knows that this is the outer bounds and should done the unit of work
+                    created = uow;
+                }
             }
 
             // for any exchange we should push/pop route context so we can keep track of which route we are routing
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 7e51696..d501526 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -32,7 +32,6 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.Service;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
  */
-public class DefaultUnitOfWork implements UnitOfWork, Service {
+public class DefaultUnitOfWork implements UnitOfWork {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
 
     // instances used by MDCUnitOfWork
@@ -81,7 +80,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
         this.useBreadcrumb = useBreadcrumb;
         this.context = (ExtendedCamelContext) exchange.getContext();
         this.inflightRepository = inflightRepository;
-        onExchange(exchange);
+        doOnPrepare(exchange);
     }
 
     UnitOfWork newInstance(Exchange exchange) {
@@ -89,50 +88,57 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
     }
 
     @Override
-    public void onExchange(Exchange exchange) {
+    public boolean onPrepare(Exchange exchange) {
         if (this.exchange == null) {
-            // unit of work is reused, so setup for this exchange
-            this.exchange = exchange;
-
-            if (allowUseOriginalMessage) {
-                // special for JmsMessage as it can cause it to loose headers later.
-                if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
-                    this.originalInMessage = new DefaultMessage(context);
-                    this.originalInMessage.setBody(exchange.getIn().getBody());
-                    this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
-                } else {
-                    this.originalInMessage = exchange.getIn().copy();
-                }
-                // must preserve exchange on the original in message
-                if (this.originalInMessage instanceof MessageSupport) {
-                    ((MessageSupport) this.originalInMessage).setExchange(exchange);
-                }
-            }
+            doOnPrepare(exchange);
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-            // inject breadcrumb header if enabled
-            if (useBreadcrumb) {
-                // create or use existing breadcrumb
-                String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
-                if (breadcrumbId == null) {
-                    // no existing breadcrumb, so create a new one based on the exchange id
-                    breadcrumbId = exchange.getExchangeId();
-                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
-                }
+    private void doOnPrepare(Exchange exchange) {
+        // unit of work is reused, so setup for this exchange
+        this.exchange = exchange;
+
+        if (allowUseOriginalMessage) {
+            // special for JmsMessage as it can cause it to loose headers later.
+            if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
+                this.originalInMessage = new DefaultMessage(context);
+                this.originalInMessage.setBody(exchange.getIn().getBody());
+                this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
+            } else {
+                this.originalInMessage = exchange.getIn().copy();
+            }
+            // must preserve exchange on the original in message
+            if (this.originalInMessage instanceof MessageSupport) {
+                ((MessageSupport) this.originalInMessage).setExchange(exchange);
             }
+        }
 
-            // fire event
-            if (context.isEventNotificationApplicable()) {
-                try {
-                    EventHelper.notifyExchangeCreated(context, exchange);
-                } catch (Throwable e) {
-                    // must catch exceptions to ensure the exchange is not failing due to notification event failed
-                    log.warn("Exception occurred during event notification. This exception will be ignored.", e);
-                }
+        // inject breadcrumb header if enabled
+        if (useBreadcrumb) {
+            // create or use existing breadcrumb
+            String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
+            if (breadcrumbId == null) {
+                // no existing breadcrumb, so create a new one based on the exchange id
+                breadcrumbId = exchange.getExchangeId();
+                exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
             }
+        }
 
-            // register to inflight registry
-            inflightRepository.add(exchange);
+        // fire event
+        if (context.isEventNotificationApplicable()) {
+            try {
+                EventHelper.notifyExchangeCreated(context, exchange);
+            } catch (Throwable e) {
+                // must catch exceptions to ensure the exchange is not failing due to notification event failed
+                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
+            }
         }
+
+        // register to inflight registry
+        inflightRepository.add(exchange);
     }
 
     @Override
@@ -161,16 +167,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
     }
 
     @Override
-    public void start() {
-        // noop
-    }
-
-    @Override
-    public void stop() {
-        // noop
-    }
-
-    @Override
     public synchronized void addSynchronization(Synchronization synchronization) {
         if (synchronizations == null) {
             synchronizations = new ArrayList<>(8);
@@ -250,6 +246,14 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
                 log.warn("Exception occurred during event notification. This exception will be ignored.", e);
             }
         }
+
+        // the exchange is now done
+        try {
+            exchange.adapt(ExtendedExchange.class).done();
+        } catch (Throwable e) {
+            // must catch exceptions to ensure synchronizations is also invoked
+            log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
+        }
     }
 
     @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index d111302..64d34f6 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -87,13 +87,6 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
     }
 
     @Override
-    public void stop() {
-        super.stop();
-        // and remove when stopping
-        clear();
-    }
-
-    @Override
     public void pushRoute(Route route) {
         super.pushRoute(route);
         if (route != null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index e1d45fa..6f44fb1 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,7 +19,15 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -89,18 +97,19 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            exchange = new DefaultExchange(camelContext);
+            ExtendedExchange answer = new DefaultExchange(camelContext);
+            if (autoRelease) {
+                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
+                answer.onDone(this::release);
+            }
+            return answer;
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
             }
-            // the exchange is reused but update the created to now
+            // reset exchange for reuse
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.setCreated(System.currentTimeMillis());
-        }
-        if (autoRelease) {
-            // add on completion which will return the exchange when done
-            exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+            ee.reset(System.currentTimeMillis());
         }
         return exchange;
     }
@@ -113,41 +122,41 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            exchange = new DefaultExchange(fromEndpoint);
+            ExtendedExchange answer = new DefaultExchange(fromEndpoint);
+            if (autoRelease) {
+                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
+                answer.onDone(this::release);
+            }
+            return answer;
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
             }
-            // the exchange is reused but update the created to now
+            // reset exchange for reuse
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.setCreated(System.currentTimeMillis());
-            // need to mark this exchange from the given endpoint
-            ee.setFromEndpoint(fromEndpoint);
-        }
-        if (autoRelease) {
-            // add on completion which will return the exchange when done
-            exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+            ee.reset(System.currentTimeMillis());
         }
         return exchange;
     }
 
     @Override
-    public void release(Exchange exchange) {
+    public boolean release(Exchange exchange) {
         // reset exchange before returning to pool
         try {
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.reset();
+            ee.done();
 
             // only release back in pool if reset was success
             if (statisticsEnabled) {
                 released.incrementAndGet();
             }
-            pool.offer(exchange);
+            return pool.offer(exchange);
         } catch (Exception e) {
             if (statisticsEnabled) {
                 discarded.incrementAndGet();
             }
             LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange);
+            return false;
         }
     }
 
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index e373e43..a9d1e87 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -552,7 +552,7 @@ public class SimpleCamelContext extends AbstractCamelContext {
                 ExchangeFactory.class);
 
         // TODO: experiment
-        //        return result.orElseGet(DefaultExchangeFactory::new);
+        //                return result.orElseGet(DefaultExchangeFactory::new);
         return result.orElseGet(PooledExchangeFactory::new);
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index ee12829..83c568c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -109,7 +109,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
         UnitOfWork uow = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory()
                 .createUnitOfWork(exchange);
         exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow);
-        uow.start();
         return uow;
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 808bfb6..1ba720a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -45,6 +46,7 @@ import org.apache.camel.util.ObjectHelper;
 public final class DefaultExchange implements ExtendedExchange {
 
     private final CamelContext context;
+    private Function<Exchange, Boolean> onDone;
     private long created;
     // optimize to create properties always and with a reasonable small size
     private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
@@ -121,49 +123,62 @@ public final class DefaultExchange implements ExtendedExchange {
         }
     }
 
-    public void reset() {
-        this.properties.clear();
-        this.exchangeId = null;
-        this.created = 0;
-        // TODO: optimize in/out to keep as default message (if original message is this kind)
-        this.in = null;
-        this.out = null;
-        this.exception = null;
-        // reset uow
-        if (this.unitOfWork != null) {
-            this.unitOfWork.reset();
-        }
-        // reset pattern to original
-        this.pattern = originalPattern;
-        if (this.onCompletions != null) {
-            this.onCompletions.clear();
-        }
-        // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
-        this.externalRedelivered = null;
-        this.historyNodeId = null;
-        this.historyNodeLabel = null;
-        this.transacted = false;
-        this.routeStop = false;
-        this.rollbackOnly = false;
-        this.rollbackOnlyLast = false;
-        this.notifyEvent = false;
-        this.interrupted = false;
-        this.interruptable = true;
-        this.redeliveryExhausted = false;
-        this.errorHandlerHandled = null;
+    @Override
+    public void onDone(Function<Exchange, Boolean> task) {
+        this.onDone = task;
     }
 
-    @Override
-    public long getCreated() {
-        return created;
+    public void done() {
+        // only need to do this if there is an onDone task
+        // and use created flag to avoid doing done more than once
+        if (created > 0) {
+            this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
+            this.properties.clear();
+            this.exchangeId = null;
+            // TODO: optimize in/out to keep as default message (if original message is this kind)
+            this.in = null;
+            this.out = null;
+            this.exception = null;
+            // reset uow
+            if (this.unitOfWork != null) {
+                this.unitOfWork.reset();
+            }
+            // reset pattern to original
+            this.pattern = originalPattern;
+            if (this.onCompletions != null) {
+                this.onCompletions.clear();
+            }
+            // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
+            this.externalRedelivered = null;
+            this.historyNodeId = null;
+            this.historyNodeLabel = null;
+            this.transacted = false;
+            this.routeStop = false;
+            this.rollbackOnly = false;
+            this.rollbackOnlyLast = false;
+            this.notifyEvent = false;
+            this.interrupted = false;
+            this.interruptable = true;
+            this.redeliveryExhausted = false;
+            this.errorHandlerHandled = null;
+
+            if (onDone != null) {
+                onDone.apply(this);
+            }
+        }
     }
 
     @Override
-    public void setCreated(long created) {
+    public void reset(long created) {
         this.created = created;
     }
 
     @Override
+    public long getCreated() {
+        return created;
+    }
+
+    @Override
     public Exchange copy() {
         DefaultExchange exchange = new DefaultExchange(this);
 
@@ -603,7 +618,7 @@ public final class DefaultExchange implements ExtendedExchange {
     @Override
     public void setUnitOfWork(UnitOfWork unitOfWork) {
         this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null) {
+        if (unitOfWork != null && onCompletions != null && !onCompletions.isEmpty()) {
             // now an unit of work has been assigned so add the on completions
             // we might have registered already
             for (Synchronization onCompletion : onCompletions) {
@@ -612,7 +627,6 @@ public final class DefaultExchange implements ExtendedExchange {
             // cleanup the temporary on completion list as they now have been registered
             // on the unit of work
             onCompletions.clear();
-            onCompletions = null;
         }
     }
 
@@ -626,7 +640,7 @@ public final class DefaultExchange implements ExtendedExchange {
             }
             onCompletions.add(onCompletion);
         } else {
-            getUnitOfWork().addSynchronization(onCompletion);
+            unitOfWork.addSynchronization(onCompletion);
         }
     }
 
@@ -643,13 +657,12 @@ public final class DefaultExchange implements ExtendedExchange {
 
     @Override
     public void handoverCompletions(Exchange target) {
-        if (onCompletions != null) {
+        if (onCompletions != null && !onCompletions.isEmpty()) {
             for (Synchronization onCompletion : onCompletions) {
                 target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
             }
             // cleanup the temporary on completion list as they have been handed over
             onCompletions.clear();
-            onCompletions = null;
         } else if (unitOfWork != null) {
             // let unit of work handover
             unitOfWork.handoverSynchronization(target);
@@ -659,10 +672,9 @@ public final class DefaultExchange implements ExtendedExchange {
     @Override
     public List<Synchronization> handoverCompletions() {
         List<Synchronization> answer = null;
-        if (onCompletions != null) {
+        if (onCompletions != null && !onCompletions.isEmpty()) {
             answer = new ArrayList<>(onCompletions);
             onCompletions.clear();
-            onCompletions = null;
         }
         return answer;
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index fd2194e..196c6ae 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -21,7 +21,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Route;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationRouteAware;
@@ -56,20 +55,11 @@ public final class UnitOfWorkHelper {
             LOG.warn("Exception occurred during done UnitOfWork for Exchange: {}. This exception will be ignored.",
                     exchange, e);
         }
-        // stop
-        try {
-            uow.stop();
-        } catch (Throwable e) {
-            LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: {}. This exception will be ignored.",
-                    exchange, e);
-        }
-        // MUST clear and set uow to null on exchange after done
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        ee.setUnitOfWork(null);
     }
 
     public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {
         if (synchronizations != null && !synchronizations.isEmpty()) {
+            // TODO: only copy/sort if there is > 1 (if 1 then use directly (no for loop)
             // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
             List<Synchronization> copy = new ArrayList<>(synchronizations);
 


[camel] 02/03: camel-http - Small optimization

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 65bf69bed6220934ceb1fcb94bff6f68352b8a35
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 19 10:13:22 2021 +0100

    camel-http - Small optimization
---
 .../org/apache/camel/component/http/HttpProducer.java     | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)

diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index af521da..b79e354 100644
--- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -173,17 +173,10 @@ public class HttpProducer extends DefaultProducer {
                 Object headerValue = entry.getValue();
 
                 if (headerValue != null) {
-                    if (headerValue instanceof String) {
-                        // optimise for string values
-                        String value = (String) headerValue;
-                        if (!strategy.applyFilterToCamelHeaders(key, value, exchange)) {
-                            httpRequest.addHeader(key, value);
-                        }
-                        continue;
-                    } else if (headerValue instanceof Long || headerValue instanceof Integer
-                            || headerValue instanceof Boolean) {
-                        // optimise for other common types
-                        String value = tc.convertTo(String.class, exchange, headerValue);
+                    if (headerValue instanceof String || headerValue instanceof Integer || headerValue instanceof Long
+                            || headerValue instanceof Boolean || headerValue instanceof Date) {
+                        // optimise for common types
+                        String value = headerValue.toString();
                         if (!strategy.applyFilterToCamelHeaders(key, value, exchange)) {
                             httpRequest.addHeader(key, value);
                         }


[camel] 01/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e9f5a12071e4318e284189f156b0be8aa4b2bc71
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 19 09:56:15 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../main/java/org/apache/camel/spi/UnitOfWork.java |  7 ++
 .../camel/impl/engine/CamelInternalProcessor.java  | 12 +--
 .../camel/impl/engine/DefaultUnitOfWork.java       | 95 ++++++++++++----------
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  6 ++
 .../org/apache/camel/support/DefaultExchange.java  | 10 ++-
 5 files changed, 77 insertions(+), 53 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index 20284fa..e94840a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -48,6 +48,13 @@ public interface UnitOfWork extends Service {
     void reset();
 
     /**
+     * Prepares this unit of work with the given input {@link Exchange}
+     *
+     * @param exchange the exchange
+     */
+    void onExchange(Exchange exchange);
+
+    /**
      * Adds a synchronization hook
      *
      * @param synchronization the hook
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 8fba8fe..255b894 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -650,22 +650,24 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
 
             // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
             UnitOfWork created = null;
+            UnitOfWork uow = exchange.getUnitOfWork();
 
-            if (exchange.getUnitOfWork() == null) {
+            if (uow == null) {
                 // If there is no existing UoW, then we should start one and
                 // terminate it once processing is completed for the exchange.
                 created = createUnitOfWork(exchange);
                 ExtendedExchange ee = (ExtendedExchange) exchange;
                 ee.setUnitOfWork(created);
                 created.start();
+                uow = created;
+            } else {
+                // reuse existing exchange
+                uow.onExchange(exchange);
             }
 
             // for any exchange we should push/pop route context so we can keep track of which route we are routing
             if (route != null) {
-                UnitOfWork existing = exchange.getUnitOfWork();
-                if (existing != null) {
-                    existing.pushRoute(route);
-                }
+                uow.pushRoute(route);
             }
 
             return created;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 975b8a0..7e51696 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -49,20 +49,16 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultUnitOfWork implements UnitOfWork, Service {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
+
+    // instances used by MDCUnitOfWork
     final InflightRepository inflightRepository;
     final boolean allowUseOriginalMessage;
     final boolean useBreadcrumb;
 
-    // TODO: This implementation seems to have transformed itself into a to broad concern
-    //   where unit of work is doing a bit more work than the transactional aspect that ties
-    //   to its name. Maybe this implementation should be named ExchangeContext and we can
-    //   introduce a simpler UnitOfWork concept. This would also allow us to refactor the
-    //   SubUnitOfWork into a general parent/child unit of work concept. However this
-    //   requires API changes and thus is best kept for future Camel work
-    private final Deque<Route> routes = new ArrayDeque<>(8);
-    private final Exchange exchange;
     private final ExtendedCamelContext context;
+    private final Deque<Route> routes = new ArrayDeque<>(8);
     private Logger log;
+    private Exchange exchange;
     private List<Synchronization> synchronizations;
     private Message originalInMessage;
     private Set<Object> transactedBy;
@@ -80,59 +76,68 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
 
     public DefaultUnitOfWork(Exchange exchange, InflightRepository inflightRepository, boolean allowUseOriginalMessage,
                              boolean useBreadcrumb) {
-        this.exchange = exchange;
         this.log = LOG;
         this.allowUseOriginalMessage = allowUseOriginalMessage;
         this.useBreadcrumb = useBreadcrumb;
         this.context = (ExtendedCamelContext) exchange.getContext();
         this.inflightRepository = inflightRepository;
+        onExchange(exchange);
+    }
 
-        if (allowUseOriginalMessage) {
-            // special for JmsMessage as it can cause it to loose headers later.
-            if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
-                this.originalInMessage = new DefaultMessage(context);
-                this.originalInMessage.setBody(exchange.getIn().getBody());
-                this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
-            } else {
-                this.originalInMessage = exchange.getIn().copy();
-            }
-            // must preserve exchange on the original in message
-            if (this.originalInMessage instanceof MessageSupport) {
-                ((MessageSupport) this.originalInMessage).setExchange(exchange);
-            }
-        }
+    UnitOfWork newInstance(Exchange exchange) {
+        return new DefaultUnitOfWork(exchange, inflightRepository, allowUseOriginalMessage, useBreadcrumb);
+    }
 
-        // inject breadcrumb header if enabled
-        if (useBreadcrumb) {
-            // create or use existing breadcrumb
-            String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
-            if (breadcrumbId == null) {
-                // no existing breadcrumb, so create a new one based on the exchange id
-                breadcrumbId = exchange.getExchangeId();
-                exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
+    @Override
+    public void onExchange(Exchange exchange) {
+        if (this.exchange == null) {
+            // unit of work is reused, so setup for this exchange
+            this.exchange = exchange;
+
+            if (allowUseOriginalMessage) {
+                // special for JmsMessage as it can cause it to loose headers later.
+                if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
+                    this.originalInMessage = new DefaultMessage(context);
+                    this.originalInMessage.setBody(exchange.getIn().getBody());
+                    this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
+                } else {
+                    this.originalInMessage = exchange.getIn().copy();
+                }
+                // must preserve exchange on the original in message
+                if (this.originalInMessage instanceof MessageSupport) {
+                    ((MessageSupport) this.originalInMessage).setExchange(exchange);
+                }
             }
-        }
 
-        // fire event
-        if (context.isEventNotificationApplicable()) {
-            try {
-                EventHelper.notifyExchangeCreated(context, exchange);
-            } catch (Throwable e) {
-                // must catch exceptions to ensure the exchange is not failing due to notification event failed
-                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
+            // inject breadcrumb header if enabled
+            if (useBreadcrumb) {
+                // create or use existing breadcrumb
+                String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
+                if (breadcrumbId == null) {
+                    // no existing breadcrumb, so create a new one based on the exchange id
+                    breadcrumbId = exchange.getExchangeId();
+                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
+                }
             }
-        }
 
-        // register to inflight registry
-        inflightRepository.add(exchange);
-    }
+            // fire event
+            if (context.isEventNotificationApplicable()) {
+                try {
+                    EventHelper.notifyExchangeCreated(context, exchange);
+                } catch (Throwable e) {
+                    // must catch exceptions to ensure the exchange is not failing due to notification event failed
+                    log.warn("Exception occurred during event notification. This exception will be ignored.", e);
+                }
+            }
 
-    UnitOfWork newInstance(Exchange exchange) {
-        return new DefaultUnitOfWork(exchange, inflightRepository, allowUseOriginalMessage, useBreadcrumb);
+            // register to inflight registry
+            inflightRepository.add(exchange);
+        }
     }
 
     @Override
     public void reset() {
+        this.exchange = null;
         routes.clear();
         if (synchronizations != null) {
             synchronizations.clear();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index f6229a2..d111302 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -202,6 +202,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
     }
 
     @Override
+    public void reset() {
+        super.reset();
+        clear();
+    }
+
+    @Override
     public String toString() {
         return "MDCUnitOfWork";
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index ad238b2..808bfb6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -53,7 +53,7 @@ public final class DefaultExchange implements ExtendedExchange {
     private Exception exception;
     private String exchangeId;
     private UnitOfWork unitOfWork;
-    private ExchangePattern originalPattern;
+    private final ExchangePattern originalPattern;
     private ExchangePattern pattern;
     private Endpoint fromEndpoint;
     private String fromRouteId;
@@ -125,16 +125,20 @@ public final class DefaultExchange implements ExtendedExchange {
         this.properties.clear();
         this.exchangeId = null;
         this.created = 0;
+        // TODO: optimize in/out to keep as default message (if original message is this kind)
         this.in = null;
         this.out = null;
         this.exception = null;
-        this.unitOfWork = null;
+        // reset uow
+        if (this.unitOfWork != null) {
+            this.unitOfWork.reset();
+        }
         // reset pattern to original
         this.pattern = originalPattern;
-        // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
         if (this.onCompletions != null) {
             this.onCompletions.clear();
         }
+        // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
         this.externalRedelivered = null;
         this.historyNodeId = null;
         this.historyNodeLabel = null;