You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 15:12:43 UTC

[camel] branch exchange-factory updated (8b92a7f -> 82d4c1b)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 8b92a7f  CAMEL-16222: PooledExchangeFactory experiment
     new ca506d7  CAMEL-16222: PooledExchangeFactory experiment
     new 870df82  CAMEL-16222: PooledExchangeFactory experiment
     new 82d4c1b  CAMEL-16222: PooledExchangeFactory experiment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/disruptor/DisruptorConsumer.java     | 11 ++++++
 .../component/master/EndpointUriEncodingTest.java  |  2 +-
 .../java/org/apache/camel/spi/ExchangeFactory.java | 22 +++++++++---
 .../camel/impl/engine/AbstractCamelContext.java    |  1 +
 .../camel/impl/engine/DefaultExchangeFactory.java  | 11 +++---
 .../camel/impl/engine/PooledExchangeFactory.java   | 40 +++++++++++++++-------
 .../org/apache/camel/support/DefaultConsumer.java  | 20 +++++++----
 7 files changed, 80 insertions(+), 27 deletions(-)


[camel] 02/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 870df82be40b0e5b13606b60e0ded2e75f6861f2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 16:00:20 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/component/disruptor/DisruptorConsumer.java    | 11 +++++++++++
 .../camel/component/master/EndpointUriEncodingTest.java |  2 +-
 .../main/java/org/apache/camel/spi/ExchangeFactory.java | 17 +++++++++++------
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 77b74e4..29cb921 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -192,6 +192,17 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
         }
     }
 
+    @Override
+    public Exchange createExchange(boolean autoRelease) {
+        // noop
+        return null;
+    }
+
+    @Override
+    public void releaseExchange(Exchange exchange) {
+        // noop
+    }
+
     /**
      * Implementation of the {@link LifecycleAwareExchangeEventHandler} interface that passes all Exchanges to the
      * {@link Processor} registered at this {@link DisruptorConsumer}.
diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
index 37c701c..c78a06c 100644
--- a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
+++ b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
@@ -86,7 +86,7 @@ public class EndpointUriEncodingTest extends CamelTestSupport {
                     return new DefaultConsumer(this, processor) {
                         @Override
                         public void start() {
-                            Exchange exchange = createExchange();
+                            Exchange exchange = createExchange(true);
                             exchange.getMessage().setHeader("foo", foo);
                             exchange.getMessage().setHeader("bar", bar);
                             try {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 8e429f2..a666bdd 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -21,17 +21,22 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 
 /**
- * Factory for creating {@link Exchange}.
- *
+ * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the consumer.
+ * <p/>
+ * This factory is only for {@link Consumer}'s to give control on how {@link Exchange} are created and comes into Camel.
+ * Each Camel component that provides a {@link Consumer} should use this {@link ExchangeFactory}.
+ * There may be other parts in Camel that creates {@link Exchange} such as sub exchanges from Splitter EIP,
+ * but they are not part of this contract as we only want to control the created {@link Exchange} that comes
+ * into Camel via {@link Consumer} or {@link org.apache.camel.PollingConsumer}.
+ * <p/>
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
  */
 public interface ExchangeFactory {
 
-    // TODO: new factory per consumer so there is no single race bottleneck
-    // TODO: only use factory on route consumer to limit its scope to most significant impact
     // TODO: release from extended exchange without onCompletion (overhead)
     // TODO: reuse unit of work (expensive to create)
+    // TODO: release via DoneUoW in less expensive way
 
     /**
      * Service factory key.
@@ -41,8 +46,8 @@ public interface ExchangeFactory {
     /**
      * Creates a new {@link ExchangeFactory} that is private for the given consumer.
      *
-     * @param consumer the consumer that will use the created {@link ExchangeFactory}
-     * @return the created factory.
+     * @param  consumer the consumer that will use the created {@link ExchangeFactory}
+     * @return          the created factory.
      */
     ExchangeFactory newExchangeFactory(Consumer consumer);
 


[camel] 01/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ca506d7fdfcd340562e64998969162ca3c000784
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 15:38:33 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/spi/ExchangeFactory.java |  9 ++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  1 +
 .../camel/impl/engine/DefaultExchangeFactory.java  | 11 ++++---
 .../camel/impl/engine/PooledExchangeFactory.java   | 36 ++++++++++++++--------
 .../org/apache/camel/support/DefaultConsumer.java  | 20 ++++++++----
 5 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 95586c4..8e429f2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 
@@ -38,6 +39,14 @@ public interface ExchangeFactory {
     String FACTORY = "exchange-factory";
 
     /**
+     * Creates a new {@link ExchangeFactory} that is private for the given consumer.
+     *
+     * @param consumer the consumer that will use the created {@link ExchangeFactory}
+     * @return the created factory.
+     */
+    ExchangeFactory newExchangeFactory(Consumer consumer);
+
+    /**
      * Gets a new {@link Exchange}
      *
      * @param autoRelease whether to auto release the exchange when routing is complete via {@link UnitOfWork}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 3e1451f..22195da 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3624,6 +3624,7 @@ public abstract class AbstractCamelContext extends BaseService
         getDataFormatResolver();
 
         getExecutorServiceManager();
+        getExchangeFactory();
         getShutdownStrategy();
         getUuidGenerator();
 
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index ec17772..a8db865 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 
@@ -41,6 +38,12 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar
     }
 
     @Override
+    public ExchangeFactory newExchangeFactory(Consumer consumer) {
+        // we just use a shared factory
+        return this;
+    }
+
+    @Override
     public Exchange create(boolean autoRelease) {
         return new DefaultExchange(camelContext);
     }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 30094cd..0541691 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,18 +19,12 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Experimental;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +37,7 @@ public class PooledExchangeFactory extends ServiceSupport
 
     private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
 
+    private final Consumer consumer;
     private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
     private final AtomicLong acquired = new AtomicLong();
@@ -53,6 +48,16 @@ public class PooledExchangeFactory extends ServiceSupport
     private CamelContext camelContext;
     private boolean statisticsEnabled = true;
 
+    public PooledExchangeFactory() {
+        this.consumer = null;
+    }
+
+    private PooledExchangeFactory(Consumer consumer, CamelContext camelContext, boolean statisticsEnabled) {
+        this.consumer = consumer;
+        this.camelContext = camelContext;
+        this.statisticsEnabled = statisticsEnabled;
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return camelContext;
@@ -63,6 +68,11 @@ public class PooledExchangeFactory extends ServiceSupport
         this.camelContext = camelContext;
     }
 
+    @Override
+    public ExchangeFactory newExchangeFactory(Consumer consumer) {
+        return new PooledExchangeFactory(consumer, camelContext, statisticsEnabled);
+    }
+
     public boolean isStatisticsEnabled() {
         return statisticsEnabled;
     }
@@ -132,7 +142,6 @@ public class PooledExchangeFactory extends ServiceSupport
             if (statisticsEnabled) {
                 discarded.incrementAndGet();
             }
-            // ignore
             LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange);
         }
     }
@@ -141,9 +150,12 @@ public class PooledExchangeFactory extends ServiceSupport
     protected void doStop() throws Exception {
         pool.clear();
 
-        if (statisticsEnabled) {
-            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}, discarded: {}]",
-                    created.get(), acquired.get(), released.get(), discarded.get());
+        if (statisticsEnabled && consumer != null) {
+            String uri = consumer.getEndpoint().getEndpointBaseUri();
+            uri = URISupport.sanitizeUri(uri);
+
+            LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]",
+                    uri, created.get(), acquired.get(), released.get(), discarded.get());
         }
 
         created.set(0);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index c8e594f..ee12829 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -56,7 +56,9 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
-        this.exchangeFactory = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory();
+        // create a per consumer exchange factory
+        this.exchangeFactory = endpoint.getCamelContext().adapt(ExtendedCamelContext.class)
+                .getExchangeFactory().newExchangeFactory(this);
     }
 
     @Override
@@ -167,19 +169,25 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     @Override
     protected void doInit() throws Exception {
         LOG.debug("Init consumer: {}", this);
-        ServiceHelper.initService(processor);
+        ServiceHelper.initService(exchangeFactory, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        LOG.debug("Starting consumer: {}", this);
+        ServiceHelper.startService(exchangeFactory, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
         LOG.debug("Stopping consumer: {}", this);
-        ServiceHelper.stopService(processor);
+        ServiceHelper.stopService(exchangeFactory, processor);
     }
 
     @Override
-    protected void doStart() throws Exception {
-        LOG.debug("Starting consumer: {}", this);
-        ServiceHelper.startService(processor);
+    protected void doShutdown() throws Exception {
+        LOG.debug("Shutting down consumer: {}", this);
+        ServiceHelper.stopAndShutdownServices(exchangeFactory, processor);
     }
 
     /**


[camel] 03/03: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 82d4c1b1e4a8c7697091c06c77693695627b6bb1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 16:12:10 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/impl/engine/PooledExchangeFactory.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 0541691..9dff80aa 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -151,11 +151,15 @@ public class PooledExchangeFactory extends ServiceSupport
         pool.clear();
 
         if (statisticsEnabled && consumer != null) {
-            String uri = consumer.getEndpoint().getEndpointBaseUri();
-            uri = URISupport.sanitizeUri(uri);
-
-            LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]",
-                    uri, created.get(), acquired.get(), released.get(), discarded.get());
+            // only log if there is any usage
+            boolean shouldLog = created.get() > 0 || acquired.get() > 0 || released.get() > 0 || discarded.get() > 0;
+            if (shouldLog) {
+                String uri = consumer.getEndpoint().getEndpointBaseUri();
+                uri = URISupport.sanitizeUri(uri);
+
+                LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: {}, released: {}, discarded: {}]",
+                        uri, created.get(), acquired.get(), released.get(), discarded.get());
+            }
         }
 
         created.set(0);