You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/07/04 07:25:02 UTC

[camel] 01/03: CAMEL-6840 - add grouped throttling feature

This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d0d8b103fab5a0292406ce67e5c26dc4a4d1595f
Author: Sezgin <on...@nokia.com>
AuthorDate: Thu Jun 14 20:26:59 2018 +0300

    CAMEL-6840 - add grouped throttling feature
---
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../management/mbean/ManagedThrottlerMBean.java    |   4 +-
 .../org/apache/camel/model/ExpressionNode.java     |   2 +
 .../apache/camel/model/ProcessorDefinition.java    |  42 ++++++
 .../org/apache/camel/model/ThrottleDefinition.java |  39 ++++-
 .../java/org/apache/camel/processor/Throttler.java | 158 ++++++++++++++++++---
 .../camel/processor/ThrottlingGroupingTest.java    |  77 ++++++++++
 .../processor/SpringThrottlerGroupingTest.java     |  31 ++++
 .../spring/processor/ThrottlerGroupingTest.xml     |  66 +++++++++
 9 files changed, 394 insertions(+), 28 deletions(-)

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 71da959..aa0582b 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,11 +6,12 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get
 === Options
 
 // eip options: START
-The Throttle EIP supports 5 options which are listed below:
+The Throttle EIP supports 6 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 | *executorServiceRef* | To use a custom thread pool (ScheduledExecutorService) by the throttler. |  | String
 | *timePeriodMillis* | Sets the time period during which the maximum request count is valid for | 1000 | Long
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index 673c13e..6e993b0 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -20,10 +20,10 @@ import org.apache.camel.api.management.ManagedAttribute;
 
 public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
 
-    @ManagedAttribute(description = "Maximum requires per period")
+    @ManagedAttribute(description = "Maximum requests per period")
     long getMaximumRequestsPerPeriod();
 
-    @ManagedAttribute(description = "Maximum requires per period")
+    @ManagedAttribute(description = "Maximum requests per period")
     void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
 
     @ManagedAttribute(description = "Time period in millis")
diff --git a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index d2ac38e..5a1d7b1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -21,6 +21,7 @@ import java.util.List;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
@@ -36,6 +37,7 @@ import org.apache.camel.spi.RouteContext;
  * @version
  */
 @XmlAccessorType(XmlAccessType.FIELD)
+@XmlTransient
 public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode> {
     @XmlElementRef
     private ExpressionDefinition expression;
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 005270e..9108d78 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,6 +2284,48 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         addOutput(answer);
         return answer;
     }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the functionality which
+     * will throttle based on the key expression to group exchanges. This will make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(Expression maximumRequestCount, long correlationExpressionKey) {
+        ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount, ExpressionBuilder.constantExpression(correlationExpressionKey));
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the functionality which
+     * will throttle based on the key expression to group exchanges. This will make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(Expression maximumRequestCount, Expression correlationExpressionKey) {
+        ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount, correlationExpressionKey);
+        addOutput(answer);
+        return answer;
+    }
     
     /**
      * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 613d2b3..eeb1645 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,8 +21,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -40,9 +42,12 @@ import org.apache.camel.spi.RouteContext;
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "throttle")
 @XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"expression", "correlationExpression", "outputs"})
 public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
     // TODO: Camel 3.0 Should not support outputs
 
+    @XmlElement(name = "correlationExpression")
+    private ExpressionSubElementDefinition correlationExpression;
     @XmlTransient
     private ExecutorService executorService;
     @XmlAttribute
@@ -55,7 +60,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
-    
+
     public ThrottleDefinition() {
     }
 
@@ -63,6 +68,18 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         super(maximumRequestsPerPeriod);
     }
 
+    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
+        this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
+    }
+
+    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
+        super(maximumRequestsPerPeriod);
+
+        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
+        cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
+        setCorrelationExpression(cor);
+    }
+
     @Override
     public String toString() {
         return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -93,9 +110,14 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         if (maxRequestsExpression == null) {
             throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
         }
+        
+        Expression correlation = null;
+        if (correlationExpression != null) {
+            correlation = correlationExpression.createExpression(routeContext);
+        }
 
         boolean reject = getRejectExecution() != null && getRejectExecution();
-        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
 
         answer.setAsyncDelayed(async);
         if (getCallerRunsWhenRejected() == null) {
@@ -104,6 +126,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         } else {
             answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
+
         return answer;
     }
 
@@ -256,4 +279,16 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     public void setRejectExecution(Boolean rejectExecution) {
         this.rejectExecution = rejectExecution;
     }
+
+    /**
+     * The expression used to calculate the correlation key to use for throttle grouping.
+     * The Exchange which has the same correlation key is throttled together.
+     */
+    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
+        this.correlationExpression = correlationExpression;
+    }
+
+    public ExpressionSubElementDefinition getCorrelationExpression() {
+        return correlationExpression;
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 543ec9a..f84498d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,8 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +36,11 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,20 +75,25 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
-    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
     private final ExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
     private volatile long timePeriodMillis;
-    private volatile int throttleRate;
     private String id;
+    private volatile Integer throttleRate = new Integer(0);
     private Expression maxRequestsPerPeriodExpression;
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
+    // below 4 fields added for (throttling grouping)
+    private Expression correlationExpression;
+    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
+    private Map<Integer, Integer> throttleRatesMap = new HashMap<>();
+    private ExecutorService delayQueueCacheExecutorService;    
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) {
+                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -93,6 +107,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
         this.timePeriodMillis = timePeriodMillis;
         this.asyncExecutor = asyncExecutor;
+        this.correlationExpression = correlation;
     }
 
     @Override
@@ -111,13 +126,22 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            calculateAndSetMaxRequestsPerPeriod(exchange);
-            ThrottlePermit permit = delayQueue.poll();
+            DelayQueue<ThrottlePermit> delayQ = null;
+            Integer key = null;
+            if (correlationExpression != null) {
+                key = correlationExpression.evaluate(exchange, Integer.class);
+                delayQ = locateDelayQueue(key, doneSync);
+            } else {
+                delayQ = delayQueue;
+            }
+            
+            calculateAndSetMaxRequestsPerPeriod(delayQ, exchange, key);
+            ThrottlePermit permit = delayQ.poll();
 
             if (permit == null) {
                 if (isRejectExecution()) {
                     throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of "
-                            + throttleRate + " within " + timePeriodMillis + "ms");
+                            + ((correlationExpression != null) ? throttleRatesMap.get(key) : throttleRate) + " within " + timePeriodMillis + "ms");
                 } else {
                     // delegate to async pool
                     if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
@@ -131,11 +155,11 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (log.isTraceEnabled()) {
                         start = System.currentTimeMillis();
                     }
-                    permit = delayQueue.take();
+                    permit = delayQ.take();
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange);
+                    enqueuePermit(permit, exchange, delayQ);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -147,7 +171,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange);
+                enqueuePermit(permit, exchange, delayQ);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -192,6 +216,28 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
+    private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key, final boolean doneSync) throws InterruptedException, ExecutionException {        
+        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
+
+        if (!doneSync) {
+            delayQueueCacheExecutorService.submit(() -> {
+                futureDelayQueue.complete(findDelayQueue(key));
+            });
+        }
+        DelayQueue<ThrottlePermit> currentQueue = (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);   
+        return currentQueue;
+    }
+
+    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
+        DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
+        if (currentDelayQueue == null) {
+            currentDelayQueue = new DelayQueue<>();
+            throttleRatesMap.put(key, 0);
+            delayQueueCache.put(key, currentDelayQueue);
+        }
+        return currentDelayQueue;
+    }
+
     /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
@@ -222,8 +268,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
+     * @throws ExecutionException 
+     * @throws InterruptedException 
      */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) {
+    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, DelayQueue<ThrottlePermit> delayQueue) throws InterruptedException, ExecutionException {
         permit.setDelayMs(getTimePeriodMillis());
         delayQueue.put(permit);
         // try and incur the least amount of overhead while releasing permits back to the queue
@@ -235,23 +283,32 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     /**
      * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
      */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
+    protected void calculateAndSetMaxRequestsPerPeriod(DelayQueue<ThrottlePermit> delayQueue, final Exchange exchange, final Integer key) throws Exception {
         Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
 
         if (newThrottle != null && newThrottle < 0) {
             throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
         }
 
-        synchronized (this) {
-            if (newThrottle == null && throttleRate == 0) {
+        Object lockOnSync = null;
+        Integer currentThrottleRate = throttleRate;
+        if (correlationExpression != null) {
+            currentThrottleRate = throttleRatesMap.get(key);
+            lockOnSync = new Integer(currentThrottleRate.intValue());
+        } else {
+            lockOnSync = this;
+        }
+        synchronized (lockOnSync) {
+            if (newThrottle == null && currentThrottleRate == 0) {
                 throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
             }
 
             if (newThrottle != null) {
-                if (newThrottle != throttleRate) {
+                if (newThrottle != currentThrottleRate) {
+                    // get the queue from the cache
                     // decrease
-                    if (throttleRate > newThrottle) {
-                        int delta = throttleRate - newThrottle;
+                    if (currentThrottleRate > newThrottle) {
+                        int delta = currentThrottleRate - newThrottle;
 
                         // discard any permits that are needed to decrease throttling
                         while (delta > 0) {
@@ -259,39 +316,89 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                             delta--;
                             log.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
                         }
-                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", currentThrottleRate, newThrottle, exchange.getExchangeId());
 
                     // increase
-                    } else if (newThrottle > throttleRate) {
-                        int delta = newThrottle - throttleRate;
+                    } else if (newThrottle > currentThrottleRate) {
+                        int delta = newThrottle - currentThrottleRate;
                         for (int i = 0; i < delta; i++) {
                             delayQueue.put(new ThrottlePermit(-1));
                         }
-                        if (throttleRate == 0) {
+                        if (currentThrottleRate == 0) {
                             log.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
                         } else {
-                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
+                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", currentThrottleRate, newThrottle, exchange.getExchangeId());
                         }
                     }
-                    throttleRate = newThrottle;
+                    if (correlationExpression != null) {
+                        throttleRatesMap.put(key, newThrottle);
+                    } else {
+                    	throttleRate = newThrottle;
+                    }
                 }
             }
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
+        if (camelContext != null) {
+            int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+            if (maxSize > 0) {
+                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
+                log.debug("DelayQueues cache size: {}", maxSize);
+            } else {
+                delayQueueCache = LRUCacheFactory.newLRUCache(100);
+                log.debug("Defaulting DelayQueues cache size: {}", 100);
+            }
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.startService(delayQueueCache);
+        }
+        if (delayQueueCacheExecutorService == null) {
+            String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
+            delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
+        }
         super.doStart();
     }
+    
+    /**
+     * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
+     *
+     * @param name  the suggested name for the background thread
+     * @return the thread pool
+     */
+    protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
+        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
+        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+    }
 
+    @SuppressWarnings("rawtypes")
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
+        if (delayQueueCacheExecutorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.stopService(delayQueueCache);
+            if (log.isDebugEnabled()) {
+                if (delayQueueCache instanceof LRUCache) {
+                    log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
+                            delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
+                }
+            }
+            delayQueueCache.clear();
+        }
+        if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
+            throttleRatesMap.clear();
+        }
         super.doShutdown();
     }
 
@@ -365,9 +472,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Gets the current maximum request per period value.
+     * If it is grouped throttling applied with correlationExpression 
+     * than the max per period within the group will return
      */
     public int getCurrentMaximumRequestsPerPeriod() {
-        return throttleRate;
+        if (correlationExpression == null) {
+            return throttleRate;
+        }
+        return Collections.max(throttleRatesMap.entrySet(), (entry1, entry2) -> entry1.getValue() - entry2.getValue()).getValue();
     }
 
     /**
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
new file mode 100644
index 0000000..09f1160
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ThrottlingGroupingTest extends ContextTestSupport {
+
+    public void testGroupingWithSingleConstant() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+    
+    public void testGroupingWithDynamicHeaderExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result2").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom", "Saloon");
+        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World");
+        
+        Map<String, Object> headers = new HashMap<String, Object>();
+
+        template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
+        template.sendBodyAndHeaders("seda:a", "Saloon", headers);
+        
+        headers.put("max", "2");
+        template.sendBodyAndHeaders("seda:a", "Hello World", headers);
+        template.sendBodyAndHeaders("seda:b", "Bye World", headers);
+        headers.put("max", "2");
+        headers.put("key", "1");
+        template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
+        headers.put("key", "2");
+        template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("seda:a").throttle(header("max"), 1).to("mock:result");
+                from("seda:b").throttle(header("max"), 2).to("mock:result2");
+                from("seda:c").throttle(header("max"), header("key")).to("mock:resultdynamic");
+            }
+        };
+    }
+}
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
new file mode 100644
index 0000000..e321838
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.ThrottlingGroupingTest;
+import org.junit.Ignore;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this,
+                "org/apache/camel/spring/processor/ThrottlerGroupingTest.xml");
+    }
+}
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
new file mode 100644
index 0000000..c3019cd
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <errorHandler id="dlc" deadLetterUri="mock:dead" type="DeadLetterChannel"/>
+    <route errorHandlerRef="dlc">
+      <from uri="seda:a"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <constant>1</constant>
+        </correlationExpression>
+        <to uri="log:result"/>
+        <to uri="mock:result"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="seda:b"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <constant>2</constant>
+        </correlationExpression>        
+        <to uri="log:result"/>
+        <to uri="mock:result2"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="seda:c"/>
+      <throttle timePeriodMillis="1000">
+        <header>max</header>
+        <correlationExpression>
+          <header>key</header>
+        </correlationExpression>
+        <to uri="log:result"/>
+        <to uri="mock:resultdynamic"/>
+      </throttle>
+    </route>
+
+  </camelContext>
+
+</beans>