You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/07/04 07:25:01 UTC

[camel] branch master updated (2f789a7 -> 350b188)

This is an automated email from the ASF dual-hosted git repository.

onders pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 2f789a7  if channel.getExistStatus() is null, then calling
     new d0d8b10  CAMEL-6840 - add grouped throttling feature
     new 6164867  CAMEL-6840 - use right lock while calculating and setting max request per period when grouped throttling used
     new 350b188  CAMEL-6840 - add javadoc to describe usage of asyncExecutor in case of async routing and use the shared thread pool

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../management/mbean/ManagedThrottlerMBean.java    |   4 +-
 .../org/apache/camel/model/ExpressionNode.java     |   2 +
 .../apache/camel/model/ProcessorDefinition.java    |  42 ++++++
 .../org/apache/camel/model/ThrottleDefinition.java |  39 +++++-
 .../java/org/apache/camel/processor/Throttler.java | 155 ++++++++++++++++++---
 ...llEvalTest.java => ThrottlingGroupingTest.java} |  63 ++++-----
 ...lTest.java => SpringThrottlerGroupingTest.java} |   7 +-
 ...stom-strategy.xml => ThrottlerGroupingTest.xml} |  42 ++++--
 9 files changed, 284 insertions(+), 73 deletions(-)
 copy camel-core/src/test/java/org/apache/camel/processor/{ThrottlerNullEvalTest.java => ThrottlingGroupingTest.java} (56%)
 copy components/camel-spring/src/test/java/org/apache/camel/spring/processor/{SpringThrottlerMethodCallTest.java => SpringThrottlerGroupingTest.java} (82%)
 copy components/camel-spring/src/test/resources/org/apache/camel/spring/processor/{aggregator-custom-strategy.xml => ThrottlerGroupingTest.xml} (59%)


[camel] 01/03: CAMEL-6840 - add grouped throttling feature

Posted by on...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d0d8b103fab5a0292406ce67e5c26dc4a4d1595f
Author: Sezgin <on...@nokia.com>
AuthorDate: Thu Jun 14 20:26:59 2018 +0300

    CAMEL-6840 - add grouped throttling feature
---
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../management/mbean/ManagedThrottlerMBean.java    |   4 +-
 .../org/apache/camel/model/ExpressionNode.java     |   2 +
 .../apache/camel/model/ProcessorDefinition.java    |  42 ++++++
 .../org/apache/camel/model/ThrottleDefinition.java |  39 ++++-
 .../java/org/apache/camel/processor/Throttler.java | 158 ++++++++++++++++++---
 .../camel/processor/ThrottlingGroupingTest.java    |  77 ++++++++++
 .../processor/SpringThrottlerGroupingTest.java     |  31 ++++
 .../spring/processor/ThrottlerGroupingTest.xml     |  66 +++++++++
 9 files changed, 394 insertions(+), 28 deletions(-)

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 71da959..aa0582b 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,11 +6,12 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get
 === Options
 
 // eip options: START
-The Throttle EIP supports 5 options which are listed below:
+The Throttle EIP supports 6 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 | *executorServiceRef* | To use a custom thread pool (ScheduledExecutorService) by the throttler. |  | String
 | *timePeriodMillis* | Sets the time period during which the maximum request count is valid for | 1000 | Long
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index 673c13e..6e993b0 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -20,10 +20,10 @@ import org.apache.camel.api.management.ManagedAttribute;
 
 public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
 
-    @ManagedAttribute(description = "Maximum requires per period")
+    @ManagedAttribute(description = "Maximum requests per period")
     long getMaximumRequestsPerPeriod();
 
-    @ManagedAttribute(description = "Maximum requires per period")
+    @ManagedAttribute(description = "Maximum requests per period")
     void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
 
     @ManagedAttribute(description = "Time period in millis")
diff --git a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index d2ac38e..5a1d7b1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -21,6 +21,7 @@ import java.util.List;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
@@ -36,6 +37,7 @@ import org.apache.camel.spi.RouteContext;
  * @version
  */
 @XmlAccessorType(XmlAccessType.FIELD)
+@XmlTransient
 public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode> {
     @XmlElementRef
     private ExpressionDefinition expression;
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 005270e..9108d78 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,6 +2284,48 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         addOutput(answer);
         return answer;
     }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the functionality which
+     * will throttle based on the key expression to group exchanges. This will make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(Expression maximumRequestCount, long correlationExpressionKey) {
+        ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount, ExpressionBuilder.constantExpression(correlationExpressionKey));
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the functionality which
+     * will throttle based on the key expression to group exchanges. This will make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(Expression maximumRequestCount, Expression correlationExpressionKey) {
+        ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount, correlationExpressionKey);
+        addOutput(answer);
+        return answer;
+    }
     
     /**
      * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 613d2b3..eeb1645 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,8 +21,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -40,9 +42,12 @@ import org.apache.camel.spi.RouteContext;
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "throttle")
 @XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"expression", "correlationExpression", "outputs"})
 public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
     // TODO: Camel 3.0 Should not support outputs
 
+    @XmlElement(name = "correlationExpression")
+    private ExpressionSubElementDefinition correlationExpression;
     @XmlTransient
     private ExecutorService executorService;
     @XmlAttribute
@@ -55,7 +60,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
-    
+
     public ThrottleDefinition() {
     }
 
@@ -63,6 +68,18 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         super(maximumRequestsPerPeriod);
     }
 
+    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
+        this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
+    }
+
+    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
+        super(maximumRequestsPerPeriod);
+
+        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
+        cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
+        setCorrelationExpression(cor);
+    }
+
     @Override
     public String toString() {
         return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -93,9 +110,14 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         if (maxRequestsExpression == null) {
             throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
         }
+        
+        Expression correlation = null;
+        if (correlationExpression != null) {
+            correlation = correlationExpression.createExpression(routeContext);
+        }
 
         boolean reject = getRejectExecution() != null && getRejectExecution();
-        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
 
         answer.setAsyncDelayed(async);
         if (getCallerRunsWhenRejected() == null) {
@@ -104,6 +126,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         } else {
             answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
+
         return answer;
     }
 
@@ -256,4 +279,16 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     public void setRejectExecution(Boolean rejectExecution) {
         this.rejectExecution = rejectExecution;
     }
+
+    /**
+     * The expression used to calculate the correlation key to use for throttle grouping.
+     * The Exchange which has the same correlation key is throttled together.
+     */
+    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
+        this.correlationExpression = correlationExpression;
+    }
+
+    public ExpressionSubElementDefinition getCorrelationExpression() {
+        return correlationExpression;
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 543ec9a..f84498d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,8 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +36,11 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,20 +75,25 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
-    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
     private final ExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
     private volatile long timePeriodMillis;
-    private volatile int throttleRate;
     private String id;
+    private volatile Integer throttleRate = new Integer(0);
     private Expression maxRequestsPerPeriodExpression;
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
+    // below 4 fields added for (throttling grouping)
+    private Expression correlationExpression;
+    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
+    private Map<Integer, Integer> throttleRatesMap = new HashMap<>();
+    private ExecutorService delayQueueCacheExecutorService;    
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) {
+                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -93,6 +107,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
         this.timePeriodMillis = timePeriodMillis;
         this.asyncExecutor = asyncExecutor;
+        this.correlationExpression = correlation;
     }
 
     @Override
@@ -111,13 +126,22 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            calculateAndSetMaxRequestsPerPeriod(exchange);
-            ThrottlePermit permit = delayQueue.poll();
+            DelayQueue<ThrottlePermit> delayQ = null;
+            Integer key = null;
+            if (correlationExpression != null) {
+                key = correlationExpression.evaluate(exchange, Integer.class);
+                delayQ = locateDelayQueue(key, doneSync);
+            } else {
+                delayQ = delayQueue;
+            }
+            
+            calculateAndSetMaxRequestsPerPeriod(delayQ, exchange, key);
+            ThrottlePermit permit = delayQ.poll();
 
             if (permit == null) {
                 if (isRejectExecution()) {
                     throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of "
-                            + throttleRate + " within " + timePeriodMillis + "ms");
+                            + ((correlationExpression != null) ? throttleRatesMap.get(key) : throttleRate) + " within " + timePeriodMillis + "ms");
                 } else {
                     // delegate to async pool
                     if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
@@ -131,11 +155,11 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (log.isTraceEnabled()) {
                         start = System.currentTimeMillis();
                     }
-                    permit = delayQueue.take();
+                    permit = delayQ.take();
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange);
+                    enqueuePermit(permit, exchange, delayQ);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -147,7 +171,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange);
+                enqueuePermit(permit, exchange, delayQ);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -192,6 +216,28 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
+    private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key, final boolean doneSync) throws InterruptedException, ExecutionException {        
+        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
+
+        if (!doneSync) {
+            delayQueueCacheExecutorService.submit(() -> {
+                futureDelayQueue.complete(findDelayQueue(key));
+            });
+        }
+        DelayQueue<ThrottlePermit> currentQueue = (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);   
+        return currentQueue;
+    }
+
+    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
+        DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
+        if (currentDelayQueue == null) {
+            currentDelayQueue = new DelayQueue<>();
+            throttleRatesMap.put(key, 0);
+            delayQueueCache.put(key, currentDelayQueue);
+        }
+        return currentDelayQueue;
+    }
+
     /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
@@ -222,8 +268,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
+     * @throws ExecutionException 
+     * @throws InterruptedException 
      */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) {
+    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, DelayQueue<ThrottlePermit> delayQueue) throws InterruptedException, ExecutionException {
         permit.setDelayMs(getTimePeriodMillis());
         delayQueue.put(permit);
         // try and incur the least amount of overhead while releasing permits back to the queue
@@ -235,23 +283,32 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     /**
      * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
      */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
+    protected void calculateAndSetMaxRequestsPerPeriod(DelayQueue<ThrottlePermit> delayQueue, final Exchange exchange, final Integer key) throws Exception {
         Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
 
         if (newThrottle != null && newThrottle < 0) {
             throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
         }
 
-        synchronized (this) {
-            if (newThrottle == null && throttleRate == 0) {
+        Object lockOnSync = null;
+        Integer currentThrottleRate = throttleRate;
+        if (correlationExpression != null) {
+            currentThrottleRate = throttleRatesMap.get(key);
+            lockOnSync = new Integer(currentThrottleRate.intValue());
+        } else {
+            lockOnSync = this;
+        }
+        synchronized (lockOnSync) {
+            if (newThrottle == null && currentThrottleRate == 0) {
                 throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
             }
 
             if (newThrottle != null) {
-                if (newThrottle != throttleRate) {
+                if (newThrottle != currentThrottleRate) {
+                    // get the queue from the cache
                     // decrease
-                    if (throttleRate > newThrottle) {
-                        int delta = throttleRate - newThrottle;
+                    if (currentThrottleRate > newThrottle) {
+                        int delta = currentThrottleRate - newThrottle;
 
                         // discard any permits that are needed to decrease throttling
                         while (delta > 0) {
@@ -259,39 +316,89 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                             delta--;
                             log.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
                         }
-                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", currentThrottleRate, newThrottle, exchange.getExchangeId());
 
                     // increase
-                    } else if (newThrottle > throttleRate) {
-                        int delta = newThrottle - throttleRate;
+                    } else if (newThrottle > currentThrottleRate) {
+                        int delta = newThrottle - currentThrottleRate;
                         for (int i = 0; i < delta; i++) {
                             delayQueue.put(new ThrottlePermit(-1));
                         }
-                        if (throttleRate == 0) {
+                        if (currentThrottleRate == 0) {
                             log.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
                         } else {
-                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", currentThrottleRate, newThrottle, exchange.getExchangeId());
                         }
                     }
-                    throttleRate = newThrottle;
+                    if (correlationExpression != null) {
+                        throttleRatesMap.put(key, newThrottle);
+                    } else {
+                    	throttleRate = newThrottle;
+                    }
                 }
             }
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
+        if (camelContext != null) {
+            int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+            if (maxSize > 0) {
+                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
+                log.debug("DelayQueues cache size: {}", maxSize);
+            } else {
+                delayQueueCache = LRUCacheFactory.newLRUCache(100);
+                log.debug("Defaulting DelayQueues cache size: {}", 100);
+            }
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.startService(delayQueueCache);
+        }
+        if (delayQueueCacheExecutorService == null) {
+            String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
+            delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
+        }
         super.doStart();
     }
+    
+    /**
+     * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
+     *
+     * @param name  the suggested name for the background thread
+     * @return the thread pool
+     */
+    protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
+        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
+        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+    }
 
+    @SuppressWarnings("rawtypes")
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
+        if (delayQueueCacheExecutorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.stopService(delayQueueCache);
+            if (log.isDebugEnabled()) {
+                if (delayQueueCache instanceof LRUCache) {
+                    log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
+                            delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
+                }
+            }
+            delayQueueCache.clear();
+        }
+        if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
+            throttleRatesMap.clear();
+        }
         super.doShutdown();
     }
 
@@ -365,9 +472,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Gets the current maximum request per period value.
+     * If it is grouped throttling applied with correlationExpression 
+     * than the max per period within the group will return
      */
     public int getCurrentMaximumRequestsPerPeriod() {
-        return throttleRate;
+        if (correlationExpression == null) {
+            return throttleRate;
+        }
+        return Collections.max(throttleRatesMap.entrySet(), (entry1, entry2) -> entry1.getValue() - entry2.getValue()).getValue();
     }
 
     /**
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
new file mode 100644
index 0000000..09f1160
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ThrottlingGroupingTest extends ContextTestSupport {
+
+    public void testGroupingWithSingleConstant() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+    
+    public void testGroupingWithDynamicHeaderExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result2").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom", "Saloon");
+        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World");
+        
+        Map<String, Object> headers = new HashMap<String, Object>();
+
+        template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
+        template.sendBodyAndHeaders("seda:a", "Saloon", headers);
+        
+        headers.put("max", "2");
+        template.sendBodyAndHeaders("seda:a", "Hello World", headers);
+        template.sendBodyAndHeaders("seda:b", "Bye World", headers);
+        headers.put("max", "2");
+        headers.put("key", "1");
+        template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
+        headers.put("key", "2");
+        template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("seda:a").throttle(header("max"), 1).to("mock:result");
+                from("seda:b").throttle(header("max"), 2).to("mock:result2");
+                from("seda:c").throttle(header("max"), header("key")).to("mock:resultdynamic");
+            }
+        };
+    }
+}
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
new file mode 100644
index 0000000..e321838
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.ThrottlingGroupingTest;
+import org.junit.Ignore;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this,
+                "org/apache/camel/spring/processor/ThrottlerGroupingTest.xml");
+    }
+}
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
new file mode 100644
index 0000000..c3019cd
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <errorHandler id="dlc" deadLetterUri="mock:dead" type="DeadLetterChannel"/>
+    <route errorHandlerRef="dlc">
+      <from uri="seda:a"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <constant>1</constant>
+        </correlationExpression>
+        <to uri="log:result"/>
+        <to uri="mock:result"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="seda:b"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <constant>2</constant>
+        </correlationExpression>        
+        <to uri="log:result"/>
+        <to uri="mock:result2"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="seda:c"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <header>key</header>
+        </correlationExpression>
+        <to uri="log:result"/>
+        <to uri="mock:resultdynamic"/>
+      </throttle>
+    </route>
+
+  </camelContext>
+
+</beans>


[camel] 03/03: CAMEL-6840 - add javadoc to describe usage of asyncExecutor in case of async routing and use the shared thread pool

Posted by on...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 350b1885584d32d9105c0abe926779c63501d791
Author: Sezgin <on...@nokia.com>
AuthorDate: Thu Jun 28 15:28:36 2018 +0300

    CAMEL-6840 - add javadoc to describe usage of asyncExecutor in case of async routing and use the shared thread pool
---
 .../java/org/apache/camel/processor/Throttler.java | 84 +++++++++++-----------
 1 file changed, 41 insertions(+), 43 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index b435fcb..a1b10c8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -86,11 +86,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
     private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
-    // below 4 fields added for (throttling grouping)
+    // below 3 fields added for (throttling grouping)
     private Expression correlationExpression;
     private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
-    private Map<Integer, Integer> throttleRatesMap = new HashMap<>();
-    private ExecutorService delayQueueCacheExecutorService;    
+    private Map<Integer, Integer> throttleRatesMap = new HashMap<>();    
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
                      final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
@@ -216,11 +215,24 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
+    /**
+     * 
+     * Finds the right Delay Queue to put a permit into with the exchanges time arrival timestamp +  timePeriodInMillis 
+     * In case of asynchronous routing there may be cases where we create new group whose correlationExpression 
+     * might first hit after long series of exchanges with a different correlationExpression and are to be on hold in 
+     * their delayQueue so we need to find delay queue to add new ones while we create a new empty delay
+     * queue for the new group hit for the first time. that's why locating delay queues for those frequently
+     * hitting exchanges for the group during asynchronous routing would be better be asynchronous with asyncExecutor 
+     * 
+     * @param key is evaluated value of correlationExpression
+     * @param doneSync is a flag indicating if the exchange is routed asynchronously or not
+     * @return DelayQueue in which the exchange with permit expiry to be put into
+     */
     private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key, final boolean doneSync) throws InterruptedException, ExecutionException {        
         CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
 
         if (!doneSync) {
-            delayQueueCacheExecutorService.submit(() -> {
+            asyncExecutor.submit(() -> {
                 futureDelayQueue.complete(findDelayQueue(key));
             });
         }
@@ -345,36 +357,23 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
-        if (camelContext != null) {
-            int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
-            if (maxSize > 0) {
-                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
-                log.debug("DelayQueues cache size: {}", maxSize);
-            } else {
-                delayQueueCache = LRUCacheFactory.newLRUCache(100);
-                log.debug("Defaulting DelayQueues cache size: {}", 100);
+        if (correlationExpression != null) {
+            if (camelContext != null) {
+                int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+                if (maxSize > 0) {
+                    delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
+                    log.debug("DelayQueues cache size: {}", maxSize);
+                } else {
+                    delayQueueCache = LRUCacheFactory.newLRUCache(100);
+                    log.debug("Defaulting DelayQueues cache size: {}", 100);
+                }
+            }
+            if (delayQueueCache != null) {
+                ServiceHelper.startService(delayQueueCache);
             }
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.startService(delayQueueCache);
-        }
-        if (delayQueueCacheExecutorService == null) {
-            String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
-            delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
         }
         super.doStart();
     }
-    
-    /**
-     * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
-     *
-     * @param name  the suggested name for the background thread
-     * @return the thread pool
-     */
-    protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
-        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
-        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
-    }
 
     @SuppressWarnings("rawtypes")
     @Override
@@ -382,21 +381,20 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
-        if (delayQueueCacheExecutorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.stopService(delayQueueCache);
-            if (log.isDebugEnabled()) {
-                if (delayQueueCache instanceof LRUCache) {
-                    log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
-                            delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
+        if (correlationExpression != null) {
+            if (delayQueueCache != null) {
+                ServiceHelper.stopService(delayQueueCache);
+                if (log.isDebugEnabled()) {
+                    if (delayQueueCache instanceof LRUCache) {
+                        log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
+                                delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
+                    }
                 }
+                delayQueueCache.clear();
+            }
+            if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
+                throttleRatesMap.clear();
             }
-            delayQueueCache.clear();
-        }
-        if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
-            throttleRatesMap.clear();
         }
         super.doShutdown();
     }


[camel] 02/03: CAMEL-6840 - use right lock while calculating and setting max request per period when grouped throttling used

Posted by on...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6164867ed9fe115754d917f872bd63db01d603d1
Author: Sezgin <on...@nokia.com>
AuthorDate: Wed Jun 20 09:20:51 2018 +0300

    CAMEL-6840 - use right lock while calculating and setting max request per period when grouped throttling used
---
 .../src/main/java/org/apache/camel/processor/Throttler.java      | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index f84498d..b435fcb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -290,14 +290,13 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
             throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
         }
 
-        Object lockOnSync = null;
+        Object lockOnSync = this;
         Integer currentThrottleRate = throttleRate;
         if (correlationExpression != null) {
             currentThrottleRate = throttleRatesMap.get(key);
-            lockOnSync = new Integer(currentThrottleRate.intValue());
-        } else {
-            lockOnSync = this;
+            lockOnSync = key;
         }
+
         synchronized (lockOnSync) {
             if (newThrottle == null && currentThrottleRate == 0) {
                 throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
@@ -333,7 +332,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (correlationExpression != null) {
                         throttleRatesMap.put(key, newThrottle);
                     } else {
-                    	throttleRate = newThrottle;
+                        throttleRate = newThrottle;
                     }
                 }
             }