You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/06/13 14:36:51 UTC

[camel] 02/02: Revert "CAMEL-6840 make it possible grouped throttling"

This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit da2b62d0f166965454fbe348439a296af19ac6be
Author: Sezgin <on...@nokia.com>
AuthorDate: Wed Jun 13 17:32:59 2018 +0300

    Revert "CAMEL-6840 make it possible grouped throttling"
    
    This reverts commit a7a458826dbafe1f155f538cfcbc0957d296fad8.
---
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../apache/camel/model/AggregateDefinition.java    |   2 +-
 .../apache/camel/model/ProcessorDefinition.java    |  42 --------
 .../org/apache/camel/model/ThrottleDefinition.java |  37 +------
 .../java/org/apache/camel/processor/Throttler.java | 107 ++-------------------
 .../camel/processor/ThrottlingGroupingTest.java    |  76 ---------------
 6 files changed, 12 insertions(+), 255 deletions(-)

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 7ae5472..71da959 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,7 +6,7 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get
 === Options
 
 // eip options: START
-The Throttle EIP supports 6 options which are listed below:
+The Throttle EIP supports 5 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -16,7 +16,6 @@ The Throttle EIP supports 6 options which are listed below:
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
 | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
 | *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean
-| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 1aa34c4c..2e60ec3 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -142,7 +142,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         this(ExpressionNodeHelper.toExpressionDefinition(expression));
     }
 
-    private AggregateDefinition(ExpressionDefinition correlationExpression) {
+    public AggregateDefinition(ExpressionDefinition correlationExpression) {
         setExpression(correlationExpression);
 
         ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index e4622e7..005270e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,48 +2284,6 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         addOutput(answer);
         return answer;
     }
-
-    /**
-     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
-     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
-     * or that we don't exceed an agreed SLA with some external service.
-     * Here another parameter correlationExpressionKey is introduced for the functionality which
-     * will throttle based on the key expression to group exchanges. This will make key-based throttling
-     * instead of overall throttling.
-     * <p/>
-     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
-     * will default ensure at most 10 messages per second.
-     *
-     * @param maximumRequestCount  an expression to calculate the maximum request count
-     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
-     * @return the builder
-     */
-    public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) {
-        ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount);
-        addOutput(answer);
-        return answer;
-    }
-
-    /**
-     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
-     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
-     * or that we don't exceed an agreed SLA with some external service.
-     * Here another parameter correlationExpressionKey is introduced for the functionality which
-     * will throttle based on the key expression to group exchanges. This will make key-based throttling
-     * instead of overall throttling.
-     * <p/>
-     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
-     * will default ensure at most 10 messages per second.
-     *
-     * @param maximumRequestCount  an expression to calculate the maximum request count
-     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
-     * @return the builder
-     */
-    public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) {
-        ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount);
-        addOutput(answer);
-        return answer;
-    }
     
     /**
      * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 7bd5213..613d2b3 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -56,9 +55,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
-    @XmlElement(name = "correlationExpression")
-    private ExpressionSubElementDefinition correlationExpression;
-
+    
     public ThrottleDefinition() {
     }
 
@@ -66,18 +63,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         super(maximumRequestsPerPeriod);
     }
 
-    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
-        this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
-    }
-
-    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
-        super(maximumRequestsPerPeriod);
-
-        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
-        cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
-        setCorrelationExpression(cor);
-    }
-
     @Override
     public String toString() {
         return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -108,14 +93,9 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         if (maxRequestsExpression == null) {
             throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
         }
-        
-        Expression correlation = null;
-        if (correlationExpression != null) {
-            correlation = correlationExpression.createExpression(routeContext);
-        }
 
         boolean reject = getRejectExecution() != null && getRejectExecution();
-        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
 
         answer.setAsyncDelayed(async);
         if (getCallerRunsWhenRejected() == null) {
@@ -124,7 +104,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         } else {
             answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
-
         return answer;
     }
 
@@ -277,16 +256,4 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     public void setRejectExecution(Boolean rejectExecution) {
         this.rejectExecution = rejectExecution;
     }
-
-    /**
-     * The expression used to calculate the correlation key to use for throttle grouping.
-     * The Exchange which has the same correlation key is throttled together.
-     */
-    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
-        this.correlationExpression = correlationExpression;
-    }
-
-    public ExpressionSubElementDefinition getCorrelationExpression() {
-        return correlationExpression;
-    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 73d53f0..543ec9a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -34,11 +31,7 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.LRUCache;
-import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,14 +61,12 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp";
     private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
-    // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation
-    // when there is no grouping for throttling
-    private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1);
 
     private enum State { SYNC, ASYNC, ASYNC_REJECTED }
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
+    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
     private final ExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
@@ -86,14 +77,9 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
-    private Expression correlationExpression;
-    // below 2 fields added for (throttling grouping)
-    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
-    private ExecutorService delayQueueCacheExecutorService;
-    
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
+                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -107,7 +93,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
         this.timePeriodMillis = timePeriodMillis;
         this.asyncExecutor = asyncExecutor;
-        this.correlationExpression = correlation;
     }
 
     @Override
@@ -126,8 +111,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            calculateAndSetMaxRequestsPerPeriod(exchange, doneSync);
-            DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
+            calculateAndSetMaxRequestsPerPeriod(exchange);
             ThrottlePermit permit = delayQueue.poll();
 
             if (permit == null) {
@@ -151,7 +135,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange, doneSync);
+                    enqueuePermit(permit, exchange);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -163,7 +147,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange, doneSync);
+                enqueuePermit(permit, exchange);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -208,34 +192,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
-    private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
-        Integer key;
-        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
-        
-        if (correlationExpression != null) {
-            key = correlationExpression.evaluate(exchange, Integer.class);
-        } else {
-            key = NO_CORRELATION_QUEUE_ID;
-        }
-        
-        if (!doneSync) {
-            delayQueueCacheExecutorService.submit(() -> {
-                futureDelayQueue.complete(findDelayQueue(key));
-            });
-        }
-            
-        return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);
-    }
-
-    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
-        DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
-        if (currentDelayQueue == null) {
-            currentDelayQueue = new DelayQueue<>();
-            delayQueueCache.put(key, currentDelayQueue);
-        }
-        return currentDelayQueue;
-    }
-
     /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
@@ -266,12 +222,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
-     * @throws ExecutionException 
-     * @throws InterruptedException 
      */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
+    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) {
         permit.setDelayMs(getTimePeriodMillis());
-        locateDelayQueue(exchange, doneSync).put(permit);
+        delayQueue.put(permit);
         // try and incur the least amount of overhead while releasing permits back to the queue
         if (log.isTraceEnabled()) {
             log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
@@ -281,7 +235,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     /**
      * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
      */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange, final boolean doneSync) throws Exception {
+    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
         Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
 
         if (newThrottle != null && newThrottle < 0) {
@@ -295,8 +249,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
             if (newThrottle != null) {
                 if (newThrottle != throttleRate) {
-                    // get the queue from the cache
-                    DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
                     // decrease
                     if (throttleRate > newThrottle) {
                         int delta = throttleRate - newThrottle;
@@ -327,62 +279,19 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
-        if (camelContext != null) {
-            int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
-            if (maxSize > 0) {
-                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
-                log.debug("DelayQueues cache size: {}", maxSize);
-            } else {
-                delayQueueCache = LRUCacheFactory.newLRUCache(100);
-                log.debug("Defaulting DelayQueues cache size: {}", 100);
-            }
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.startService(delayQueueCache);
-        }
-        if (delayQueueCacheExecutorService == null) {
-            String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
-            delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
-        }
         super.doStart();
     }
-    
-    /**
-     * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
-     *
-     * @param name  the suggested name for the background thread
-     * @return the thread pool
-     */
-    protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
-        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
-        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
-    }
 
-    @SuppressWarnings("rawtypes")
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
-        if (delayQueueCacheExecutorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.stopService(delayQueueCache);
-            if (log.isDebugEnabled()) {
-                if (delayQueueCache instanceof LRUCache) {
-                    log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
-                            delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
-                }
-            }
-            delayQueueCache.clear();
-        }
         super.doShutdown();
     }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
deleted file mode 100644
index 01cd378..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-
-/**
- * @version 
- */
-public class ThrottlingGroupingTest extends ContextTestSupport {
-
-    public void testGroupingWithSingleConstant() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
-
-        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
-        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
-        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
-
-        assertMockEndpointsSatisfied();
-    }
-    
-    public void testGroupingWithDynamicHeaderExpression() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
-        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World");
-        
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("max", null);
-
-        template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
-        
-        headers.put("max", "2");
-        template.sendBodyAndHeaders("seda:a", "Hello World", headers);
-        template.sendBodyAndHeaders("seda:b", "Bye World", headers);
-
-        headers.put("key", "1");
-        template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
-        headers.put("key", "2");
-        template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
-        
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead"));
-
-                from("seda:a").throttle(1, header("max")).to("mock:result");
-                from("seda:b").throttle(2, header("max")).to("mock:result");
-                from("seda:c").throttle(header("key"), header("max")).to("mock:resultdynamic");
-            }
-        };
-    }
-}

-- 
To stop receiving notification emails like this one, please contact
onders@apache.org.