You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/06/10 09:04:39 UTC
[camel] 01/02: CAMEL-6840 make it possible grouped throttling
This is an automated email from the ASF dual-hosted git repository.
onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a7a458826dbafe1f155f538cfcbc0957d296fad8
Author: Sezgin <on...@nokia.com>
AuthorDate: Tue Jun 5 16:02:19 2018 +0300
CAMEL-6840 make it possible grouped throttling
---
camel-core/src/main/docs/eips/throttle-eip.adoc | 3 +-
.../apache/camel/model/AggregateDefinition.java | 2 +-
.../apache/camel/model/ProcessorDefinition.java | 42 ++++++++
.../org/apache/camel/model/ThrottleDefinition.java | 37 ++++++-
.../java/org/apache/camel/processor/Throttler.java | 107 +++++++++++++++++++--
.../camel/processor/ThrottlingGroupingTest.java | 76 +++++++++++++++
6 files changed, 255 insertions(+), 12 deletions(-)
diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 71da959..7ae5472 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,7 +6,7 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get
=== Options
// eip options: START
-The Throttle EIP supports 5 options which are listed below:
+The Throttle EIP supports 6 options which are listed below:
[width="100%",cols="2,5,^1,2",options="header"]
|===
@@ -16,6 +16,7 @@ The Throttle EIP supports 5 options which are listed below:
| *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
| *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
| *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean
+| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. | | NamespaceAware Expression
|===
// eip options: END
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 2e60ec3..1aa34c4c 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -142,7 +142,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
this(ExpressionNodeHelper.toExpressionDefinition(expression));
}
- public AggregateDefinition(ExpressionDefinition correlationExpression) {
+ private AggregateDefinition(ExpressionDefinition correlationExpression) {
setExpression(correlationExpression);
ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 005270e..e4622e7 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,6 +2284,48 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
addOutput(answer);
return answer;
}
+
+ /**
+ * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+ * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+ * or that we don't exceed an agreed SLA with some external service.
+ * Here another parameter correlationExpressionKey is introduced for the functionality which
+ * will throttle based on the key expression to group exchanges. This will make key-based throttling
+ * instead of overall throttling.
+ * <p/>
+ * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+ * will default ensure at most 10 messages per second.
+ *
+ * @param maximumRequestCount an expression to calculate the maximum request count
+ * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall throttling
+ * @return the builder
+ */
+ public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) {
+ ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount);
+ addOutput(answer);
+ return answer;
+ }
+
+ /**
+ * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+ * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
+ * or that we don't exceed an agreed SLA with some external service.
+ * Here another parameter correlationExpressionKey is introduced for the functionality which
+ * will throttle based on the key expression to group exchanges. This will make key-based throttling
+ * instead of overall throttling.
+ * <p/>
+ * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
+ * will default ensure at most 10 messages per second.
+ *
+ * @param maximumRequestCount an expression to calculate the maximum request count
+ * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key instead of overall throttling
+ * @return the builder
+ */
+ public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) {
+ ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount);
+ addOutput(answer);
+ return answer;
+ }
/**
* <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 613d2b3..7bd5213 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
@@ -55,7 +56,9 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
private Boolean callerRunsWhenRejected;
@XmlAttribute
private Boolean rejectExecution;
-
+ @XmlElement(name = "correlationExpression")
+ private ExpressionSubElementDefinition correlationExpression;
+
public ThrottleDefinition() {
}
@@ -63,6 +66,18 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
super(maximumRequestsPerPeriod);
}
+ public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
+ this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
+ }
+
+ private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
+ super(maximumRequestsPerPeriod);
+
+ ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
+ cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
+ setCorrelationExpression(cor);
+ }
+
@Override
public String toString() {
return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -93,9 +108,14 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
if (maxRequestsExpression == null) {
throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
}
+
+ Expression correlation = null;
+ if (correlationExpression != null) {
+ correlation = correlationExpression.createExpression(routeContext);
+ }
boolean reject = getRejectExecution() != null && getRejectExecution();
- Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
+ Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
answer.setAsyncDelayed(async);
if (getCallerRunsWhenRejected() == null) {
@@ -104,6 +124,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
} else {
answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
}
+
return answer;
}
@@ -256,4 +277,16 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
public void setRejectExecution(Boolean rejectExecution) {
this.rejectExecution = rejectExecution;
}
+
+ /**
+ * The expression used to calculate the correlation key to use for throttle grouping.
+ * The Exchange which has the same correlation key is throttled together.
+ */
+ public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
+ this.correlationExpression = correlationExpression;
+ }
+
+ public ExpressionSubElementDefinition getCorrelationExpression() {
+ return correlationExpression;
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 543ec9a..73d53f0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,8 +16,11 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -31,7 +34,11 @@ import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.LRUCacheFactory;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,12 +68,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp";
private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
+ // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation
+ // when there is no grouping for throttling
+ private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1);
private enum State { SYNC, ASYNC, ASYNC_REJECTED }
private final Logger log = LoggerFactory.getLogger(Throttler.class);
private final CamelContext camelContext;
- private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
private final ExecutorService asyncExecutor;
private final boolean shutdownAsyncExecutor;
@@ -77,9 +86,14 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
private boolean rejectExecution;
private boolean asyncDelayed;
private boolean callerRunsWhenRejected = true;
+ private Expression correlationExpression;
+ // below 2 fields added for (throttling grouping)
+ private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
+ private ExecutorService delayQueueCacheExecutorService;
+
public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
- final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) {
+ final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
super(processor);
this.camelContext = camelContext;
this.rejectExecution = rejectExecution;
@@ -93,6 +107,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
}
this.timePeriodMillis = timePeriodMillis;
this.asyncExecutor = asyncExecutor;
+ this.correlationExpression = correlation;
}
@Override
@@ -111,7 +126,8 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
throw new RejectedExecutionException("Run is not allowed");
}
- calculateAndSetMaxRequestsPerPeriod(exchange);
+ calculateAndSetMaxRequestsPerPeriod(exchange, doneSync);
+ DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
ThrottlePermit permit = delayQueue.poll();
if (permit == null) {
@@ -135,7 +151,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
if (log.isTraceEnabled()) {
elapsed = System.currentTimeMillis() - start;
}
- enqueuePermit(permit, exchange);
+ enqueuePermit(permit, exchange, doneSync);
if (state == State.ASYNC) {
if (log.isTraceEnabled()) {
@@ -147,7 +163,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
}
}
} else {
- enqueuePermit(permit, exchange);
+ enqueuePermit(permit, exchange, doneSync);
if (state == State.ASYNC) {
if (log.isTraceEnabled()) {
@@ -192,6 +208,34 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
}
}
+ private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
+ Integer key;
+ CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
+
+ if (correlationExpression != null) {
+ key = correlationExpression.evaluate(exchange, Integer.class);
+ } else {
+ key = NO_CORRELATION_QUEUE_ID;
+ }
+
+ if (!doneSync) {
+ delayQueueCacheExecutorService.submit(() -> {
+ futureDelayQueue.complete(findDelayQueue(key));
+ });
+ }
+
+ return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);
+ }
+
+ private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
+ DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
+ if (currentDelayQueue == null) {
+ currentDelayQueue = new DelayQueue<>();
+ delayQueueCache.put(key, currentDelayQueue);
+ }
+ return currentDelayQueue;
+ }
+
/**
* Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
* and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
@@ -222,10 +266,12 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
/**
* Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
+ * @throws ExecutionException
+ * @throws InterruptedException
*/
- protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) {
+ protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
permit.setDelayMs(getTimePeriodMillis());
- delayQueue.put(permit);
+ locateDelayQueue(exchange, doneSync).put(permit);
// try and incur the least amount of overhead while releasing permits back to the queue
if (log.isTraceEnabled()) {
log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
@@ -235,7 +281,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
/**
* Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
*/
- protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
+ protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange, final boolean doneSync) throws Exception {
Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
if (newThrottle != null && newThrottle < 0) {
@@ -249,6 +295,8 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
if (newThrottle != null) {
if (newThrottle != throttleRate) {
+ // get the queue from the cache
+ DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
// decrease
if (throttleRate > newThrottle) {
int delta = throttleRate - newThrottle;
@@ -279,19 +327,62 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
}
}
+ @SuppressWarnings("unchecked")
@Override
protected void doStart() throws Exception {
if (isAsyncDelayed()) {
ObjectHelper.notNull(asyncExecutor, "executorService", this);
}
+ if (camelContext != null) {
+ int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+ if (maxSize > 0) {
+ delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
+ log.debug("DelayQueues cache size: {}", maxSize);
+ } else {
+ delayQueueCache = LRUCacheFactory.newLRUCache(100);
+ log.debug("Defaulting DelayQueues cache size: {}", 100);
+ }
+ }
+ if (delayQueueCache != null) {
+ ServiceHelper.startService(delayQueueCache);
+ }
+ if (delayQueueCacheExecutorService == null) {
+ String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
+ delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
+ }
super.doStart();
}
+
+ /**
+ * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
+ *
+ * @param name the suggested name for the background thread
+ * @return the thread pool
+ */
+ protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
+ // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
+ return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+ }
+ @SuppressWarnings("rawtypes")
@Override
protected void doShutdown() throws Exception {
if (shutdownAsyncExecutor && asyncExecutor != null) {
camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
}
+ if (delayQueueCacheExecutorService != null) {
+ camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
+ }
+ if (delayQueueCache != null) {
+ ServiceHelper.stopService(delayQueueCache);
+ if (log.isDebugEnabled()) {
+ if (delayQueueCache instanceof LRUCache) {
+ log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
+ delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
+ }
+ }
+ delayQueueCache.clear();
+ }
super.doShutdown();
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
new file mode 100644
index 0000000..01cd378
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ThrottlingGroupingTest extends ContextTestSupport {
+
+ public void testGroupingWithSingleConstant() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+ template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+ template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+ template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testGroupingWithDynamicHeaderExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+ getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("max", null);
+
+ template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
+
+ headers.put("max", "2");
+ template.sendBodyAndHeaders("seda:a", "Hello World", headers);
+ template.sendBodyAndHeaders("seda:b", "Bye World", headers);
+
+ headers.put("key", "1");
+ template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
+ headers.put("key", "2");
+ template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("seda:a").throttle(1, header("max")).to("mock:result");
+ from("seda:b").throttle(2, header("max")).to("mock:result");
+ from("seda:c").throttle(header("key"), header("max")).to("mock:resultdynamic");
+ }
+ };
+ }
+}
--
To stop receiving notification emails like this one, please contact
onders@apache.org.