You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/06/13 14:36:49 UTC

[camel] branch master updated (0f5b58a -> da2b62d)

This is an automated email from the ASF dual-hosted git repository.

onders pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 0f5b58a  Upgrade Chronicle Bytes to version 1.16.9
     new ef8756a  Revert "CAMEL-6840 - fix for java docs and review comments"
     new da2b62d  Revert "CAMEL-6840 make it possible grouped throttling"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../apache/camel/model/AggregateDefinition.java    |   2 +-
 .../apache/camel/model/ProcessorDefinition.java    |  42 --------
 .../org/apache/camel/model/ThrottleDefinition.java |  37 +------
 .../java/org/apache/camel/processor/Throttler.java | 107 ++-------------------
 .../camel/processor/ThrottlingGroupingTest.java    |  76 ---------------
 6 files changed, 12 insertions(+), 255 deletions(-)
 delete mode 100644 camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java

-- 
To stop receiving notification emails like this one, please contact
onders@apache.org.

[camel] 02/02: Revert "CAMEL-6840 make it possible grouped throttling"

Posted by on...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit da2b62d0f166965454fbe348439a296af19ac6be
Author: Sezgin <on...@nokia.com>
AuthorDate: Wed Jun 13 17:32:59 2018 +0300

    Revert "CAMEL-6840 make it possible grouped throttling"
    
    This reverts commit a7a458826dbafe1f155f538cfcbc0957d296fad8.
---
 camel-core/src/main/docs/eips/throttle-eip.adoc    |   3 +-
 .../apache/camel/model/AggregateDefinition.java    |   2 +-
 .../apache/camel/model/ProcessorDefinition.java    |  42 --------
 .../org/apache/camel/model/ThrottleDefinition.java |  37 +------
 .../java/org/apache/camel/processor/Throttler.java | 107 ++-------------------
 .../camel/processor/ThrottlingGroupingTest.java    |  76 ---------------
 6 files changed, 12 insertions(+), 255 deletions(-)

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 7ae5472..71da959 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,7 +6,7 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get
 === Options
 
 // eip options: START
-The Throttle EIP supports 6 options which are listed below:
+The Throttle EIP supports 5 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -16,7 +16,6 @@ The Throttle EIP supports 6 options which are listed below:
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
 | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
 | *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean
-| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 1aa34c4c..2e60ec3 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -142,7 +142,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         this(ExpressionNodeHelper.toExpressionDefinition(expression));
     }
 
-    private AggregateDefinition(ExpressionDefinition correlationExpression) {
+    public AggregateDefinition(ExpressionDefinition correlationExpression) {
         setExpression(correlationExpression);
 
         ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index e4622e7..005270e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,48 +2284,6 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         addOutput(answer);
         return answer;
     }
-
-    /**
-     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
-     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
-     * or that we don't exceed an agreed SLA with some external service.
-     * Here another parameter correlationExpressionKey is introduced for the functionality which
-     * will throttle based on the key expression to group exchanges. This will make key-based throttling
-     * instead of overall throttling.
-     * <p/>
-     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
-     * will default ensure at most 10 messages per second.
-     *
-     * @param maximumRequestCount  an expression to calculate the maximum request count
-     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
-     * @return the builder
-     */
-    public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) {
-        ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount);
-        addOutput(answer);
-        return answer;
-    }
-
-    /**
-     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
-     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
-     * or that we don't exceed an agreed SLA with some external service.
-     * Here another parameter correlationExpressionKey is introduced for the functionality which
-     * will throttle based on the key expression to group exchanges. This will make key-based throttling
-     * instead of overall throttling.
-     * <p/>
-     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
-     * will default ensure at most 10 messages per second.
-     *
-     * @param maximumRequestCount  an expression to calculate the maximum request count
-     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
-     * @return the builder
-     */
-    public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) {
-        ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount);
-        addOutput(answer);
-        return answer;
-    }
     
     /**
      * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 7bd5213..613d2b3 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -56,9 +55,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
-    @XmlElement(name = "correlationExpression")
-    private ExpressionSubElementDefinition correlationExpression;
-
+    
     public ThrottleDefinition() {
     }
 
@@ -66,18 +63,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         super(maximumRequestsPerPeriod);
     }
 
-    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
-        this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
-    }
-
-    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
-        super(maximumRequestsPerPeriod);
-
-        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
-        cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
-        setCorrelationExpression(cor);
-    }
-
     @Override
     public String toString() {
         return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -108,14 +93,9 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         if (maxRequestsExpression == null) {
             throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
         }
-        
-        Expression correlation = null;
-        if (correlationExpression != null) {
-            correlation = correlationExpression.createExpression(routeContext);
-        }
 
         boolean reject = getRejectExecution() != null && getRejectExecution();
-        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject);
 
         answer.setAsyncDelayed(async);
         if (getCallerRunsWhenRejected() == null) {
@@ -124,7 +104,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         } else {
             answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
-
         return answer;
     }
 
@@ -277,16 +256,4 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     public void setRejectExecution(Boolean rejectExecution) {
         this.rejectExecution = rejectExecution;
     }
-
-    /**
-     * The expression used to calculate the correlation key to use for throttle grouping.
-     * The Exchange which has the same correlation key is throttled together.
-     */
-    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
-        this.correlationExpression = correlationExpression;
-    }
-
-    public ExpressionSubElementDefinition getCorrelationExpression() {
-        return correlationExpression;
-    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 73d53f0..543ec9a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -34,11 +31,7 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.LRUCache;
-import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,14 +61,12 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp";
     private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
-    // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation
-    // when there is no grouping for throttling
-    private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1);
 
     private enum State { SYNC, ASYNC, ASYNC_REJECTED }
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
+    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
     private final ExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
@@ -86,14 +77,9 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
-    private Expression correlationExpression;
-    // below 2 fields added for (throttling grouping)
-    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
-    private ExecutorService delayQueueCacheExecutorService;
-    
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
+                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -107,7 +93,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
         this.timePeriodMillis = timePeriodMillis;
         this.asyncExecutor = asyncExecutor;
-        this.correlationExpression = correlation;
     }
 
     @Override
@@ -126,8 +111,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            calculateAndSetMaxRequestsPerPeriod(exchange, doneSync);
-            DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
+            calculateAndSetMaxRequestsPerPeriod(exchange);
             ThrottlePermit permit = delayQueue.poll();
 
             if (permit == null) {
@@ -151,7 +135,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange, doneSync);
+                    enqueuePermit(permit, exchange);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -163,7 +147,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange, doneSync);
+                enqueuePermit(permit, exchange);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -208,34 +192,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
-    private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
-        Integer key;
-        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
-        
-        if (correlationExpression != null) {
-            key = correlationExpression.evaluate(exchange, Integer.class);
-        } else {
-            key = NO_CORRELATION_QUEUE_ID;
-        }
-        
-        if (!doneSync) {
-            delayQueueCacheExecutorService.submit(() -> {
-                futureDelayQueue.complete(findDelayQueue(key));
-            });
-        }
-            
-        return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);
-    }
-
-    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
-        DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
-        if (currentDelayQueue == null) {
-            currentDelayQueue = new DelayQueue<>();
-            delayQueueCache.put(key, currentDelayQueue);
-        }
-        return currentDelayQueue;
-    }
-
     /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
@@ -266,12 +222,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
-     * @throws ExecutionException 
-     * @throws InterruptedException 
      */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException {
+    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) {
         permit.setDelayMs(getTimePeriodMillis());
-        locateDelayQueue(exchange, doneSync).put(permit);
+        delayQueue.put(permit);
         // try and incur the least amount of overhead while releasing permits back to the queue
         if (log.isTraceEnabled()) {
             log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
@@ -281,7 +235,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     /**
      * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
      */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange, final boolean doneSync) throws Exception {
+    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
         Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
 
         if (newThrottle != null && newThrottle < 0) {
@@ -295,8 +249,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
             if (newThrottle != null) {
                 if (newThrottle != throttleRate) {
-                    // get the queue from the cache
-                    DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync);
                     // decrease
                     if (throttleRate > newThrottle) {
                         int delta = throttleRate - newThrottle;
@@ -327,62 +279,19 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
-        if (camelContext != null) {
-            int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
-            if (maxSize > 0) {
-                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
-                log.debug("DelayQueues cache size: {}", maxSize);
-            } else {
-                delayQueueCache = LRUCacheFactory.newLRUCache(100);
-                log.debug("Defaulting DelayQueues cache size: {}", 100);
-            }
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.startService(delayQueueCache);
-        }
-        if (delayQueueCacheExecutorService == null) {
-            String name = getClass().getSimpleName() + "-DelayQueueLocatorTask";
-            delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name);
-        }
         super.doStart();
     }
-    
-    /**
-     * Strategy to create the thread pool for locating right DelayQueue from the case as a background task
-     *
-     * @param name  the suggested name for the background thread
-     * @return the thread pool
-     */
-    protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) {
-        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
-        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
-    }
 
-    @SuppressWarnings("rawtypes")
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
-        if (delayQueueCacheExecutorService != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
-        }
-        if (delayQueueCache != null) {
-            ServiceHelper.stopService(delayQueueCache);
-            if (log.isDebugEnabled()) {
-                if (delayQueueCache instanceof LRUCache) {
-                    log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
-                            delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
-                }
-            }
-            delayQueueCache.clear();
-        }
         super.doShutdown();
     }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
deleted file mode 100644
index 01cd378..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-
-/**
- * @version 
- */
-public class ThrottlingGroupingTest extends ContextTestSupport {
-
-    public void testGroupingWithSingleConstant() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
-
-        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
-        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
-        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
-
-        assertMockEndpointsSatisfied();
-    }
-    
-    public void testGroupingWithDynamicHeaderExpression() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
-        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World");
-        
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("max", null);
-
-        template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
-        
-        headers.put("max", "2");
-        template.sendBodyAndHeaders("seda:a", "Hello World", headers);
-        template.sendBodyAndHeaders("seda:b", "Bye World", headers);
-
-        headers.put("key", "1");
-        template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
-        headers.put("key", "2");
-        template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
-        
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead"));
-
-                from("seda:a").throttle(1, header("max")).to("mock:result");
-                from("seda:b").throttle(2, header("max")).to("mock:result");
-                from("seda:c").throttle(header("key"), header("max")).to("mock:resultdynamic");
-            }
-        };
-    }
-}

-- 
To stop receiving notification emails like this one, please contact
onders@apache.org.

[camel] 01/02: Revert "CAMEL-6840 - fix for java docs and review comments"

Posted by on...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ef8756a3c8bbe3b1167d7b71098ed01fb3a625dc
Author: Sezgin <on...@nokia.com>
AuthorDate: Wed Jun 13 17:31:09 2018 +0300

    Revert "CAMEL-6840 - fix for java docs and review comments"
    
    This reverts commit 53ca1fbcf160e70982f396c8c455907f196c6bdb.
---
 camel-core/src/main/docs/eips/throttle-eip.adoc                       | 2 +-
 .../src/main/java/org/apache/camel/model/ProcessorDefinition.java     | 4 ++--
 .../src/main/java/org/apache/camel/model/ThrottleDefinition.java      | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc
index aa0582b..7ae5472 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -11,12 +11,12 @@ The Throttle EIP supports 6 options which are listed below:
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 | *executorServiceRef* | To use a custom thread pool (ScheduledExecutorService) by the throttler. |  | String
 | *timePeriodMillis* | Sets the time period during which the maximum request count is valid for | 1000 | Long
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
 | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
 | *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean
+| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. |  | NamespaceAware Expression
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8a5fbf1..e4622e7 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2296,8 +2296,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
      * will default ensure at most 10 messages per second.
      *
-     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
      * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key that can throttle by the given key instead of overall throttling
      * @return the builder
      */
     public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) {
@@ -2317,8 +2317,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
      * will default ensure at most 10 messages per second.
      *
-     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
      * @param maximumRequestCount  an expression to calculate the maximum request count
+     * @param correlationExpressionKey  is a correlation key as an expression that can throttle by the given key instead of overall throttling
      * @return the builder
      */
     public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) {
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 06ac79c..7bd5213 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -44,8 +44,6 @@ import org.apache.camel.spi.RouteContext;
 public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
     // TODO: Camel 3.0 Should not support outputs
 
-    @XmlElement(name = "correlationExpression")
-    private ExpressionSubElementDefinition correlationExpression;
     @XmlTransient
     private ExecutorService executorService;
     @XmlAttribute
@@ -58,6 +56,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
+    @XmlElement(name = "correlationExpression")
+    private ExpressionSubElementDefinition correlationExpression;
 
     public ThrottleDefinition() {
     }

-- 
To stop receiving notification emails like this one, please contact
onders@apache.org.