You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 05:38:05 UTC

[camel] branch exchange-factory updated (76617ba -> 5c336a9)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 76617ba  CAMEL-16222: PooledExchangeFactory experiment
     new b4019b2  camel-http - Optimize to avoid type convertion that would do deep checking.
     new 5c336a9  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/http/HttpProducer.java  | 19 +++++-
 .../src/main/java/org/apache/camel/Ordered.java    |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java   | 70 +++++++++++++++++++++-
 .../camel/impl/engine/SimpleCamelContext.java      |  4 +-
 4 files changed, 89 insertions(+), 6 deletions(-)


[camel] 01/02: camel-http - Optimize to avoid type convertion that would do deep checking.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b4019b2564b8814f49fe6d40235807b5d30c080d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Feb 17 16:25:45 2021 +0100

    camel-http - Optimize to avoid type convertion that would do deep checking.
---
 .../org/apache/camel/component/http/HttpProducer.java | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index 154debb..41d5205 100644
--- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -616,12 +616,25 @@ public class HttpProducer extends DefaultProducer {
      * @throws CamelExchangeException is thrown if error creating RequestEntity
      */
     protected HttpEntity createRequestEntity(Exchange exchange) throws CamelExchangeException {
+        HttpEntity answer = null;
+
         Message in = exchange.getIn();
-        if (in.getBody() == null) {
-            return null;
+        Object body = in.getBody();
+        try {
+            if (body == null) {
+                return null;
+            // special optimized for using these 3 type converters for common message payload types
+            } else if (body instanceof byte[]) {
+                answer = HttpEntityConverter.toHttpEntity((byte[]) body, exchange);
+            } else if (body instanceof InputStream) {
+                answer = HttpEntityConverter.toHttpEntity((InputStream) body, exchange);
+            } else if (body instanceof String) {
+                answer = HttpEntityConverter.toHttpEntity((String) body, exchange);
+            }
+        } catch (Exception e) {
+            throw new CamelExchangeException("Error creating RequestEntity from message body", exchange, e);
         }
 
-        HttpEntity answer = in.getBody(HttpEntity.class);
         if (answer == null) {
             try {
                 Object data = in.getBody();


[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5c336a91e9ad8f827a17ca99de3e59aa240ab770
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 06:36:52 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../src/main/java/org/apache/camel/Ordered.java    |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java   | 70 +++++++++++++++++++++-
 .../camel/impl/engine/SimpleCamelContext.java      |  4 +-
 3 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Ordered.java b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
index beb50a5..df5cddec 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Ordered.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
@@ -29,7 +29,7 @@ public interface Ordered {
     /**
      * The lowest precedence
      */
-    int LOWEST = Integer.MAX_VALUE;
+    int LOWEST = Integer.MAX_VALUE - 1000; // reserve for internal use
 
     /**
      * Gets the order.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index ced186e..88142c9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -17,11 +17,22 @@
 package org.apache.camel.impl.engine;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
@@ -30,9 +41,16 @@ import org.apache.camel.support.service.ServiceSupport;
 public class PooledExchangeFactory extends ServiceSupport
         implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
+
+    private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
+    private final AtomicLong acquired = new AtomicLong();
+    private final AtomicLong created = new AtomicLong();
+    private final AtomicLong released = new AtomicLong();
 
     private CamelContext camelContext;
+    private boolean statisticsEnabled = true;
 
     @Override
     public CamelContext getCamelContext() {
@@ -44,17 +62,33 @@ public class PooledExchangeFactory extends ServiceSupport
         this.camelContext = camelContext;
     }
 
+    public boolean isStatisticsEnabled() {
+        return statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        this.statisticsEnabled = statisticsEnabled;
+    }
+
     @Override
     public Exchange create() {
         Exchange exchange = pool.poll();
         if (exchange == null) {
+            if (statisticsEnabled) {
+                created.incrementAndGet();
+            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultExchange(camelContext);
         } else {
+            if (statisticsEnabled) {
+                acquired.incrementAndGet();
+            }
             // reset exchange before we use it
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
             ee.reset();
         }
+        // add on completion which will return the exchange when done
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         return exchange;
     }
 
@@ -62,23 +96,57 @@ public class PooledExchangeFactory extends ServiceSupport
     public Exchange create(Endpoint fromEndpoint) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
+            if (statisticsEnabled) {
+                created.incrementAndGet();
+            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultExchange(fromEndpoint);
         } else {
+            if (statisticsEnabled) {
+                acquired.incrementAndGet();
+            }
             // need to mark this exchange from the given endpoint
             exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
         }
+        // add on completion which will return the exchange when done
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         return exchange;
     }
 
     @Override
     public void release(Exchange exchange) {
+        if (statisticsEnabled) {
+            released.incrementAndGet();
+        }
         pool.offer(exchange);
     }
 
     @Override
     protected void doStop() throws Exception {
         pool.clear();
+
+        if (statisticsEnabled) {
+            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}]", created.get(), acquired.get(),
+                    released.get());
+        }
+
+        created.set(0);
+        acquired.set(0);
+        released.set(0);
+    }
+
+    private final class ReleaseOnCompletion extends SynchronizationAdapter {
+
+        @Override
+        public int getOrder() {
+            // should be very very last so set as highest value possible
+            return Integer.MAX_VALUE;
+        }
+
+        @Override
+        public void onDone(Exchange exchange) {
+            release(exchange);
+        }
     }
 
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index b91710c..e373e43 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -551,7 +551,9 @@ public class SimpleCamelContext extends AbstractCamelContext {
                 ExchangeFactory.FACTORY,
                 ExchangeFactory.class);
 
-        return result.orElseGet(DefaultExchangeFactory::new);
+        // TODO: experiment
+        //        return result.orElseGet(DefaultExchangeFactory::new);
+        return result.orElseGet(PooledExchangeFactory::new);
     }
 
     @Override