You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by on...@apache.org on 2018/07/31 19:44:30 UTC
[camel] branch master updated: CAMEL-6840 - add more regression
which is similar to already existing ThrottlerTests where grouping is added
and fix some CS errors in camel-core
This is an automated email from the ASF dual-hosted git repository.
onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 1b5040f CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
new 338d058 CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
1b5040f is described below
commit 1b5040f2b4288d4b1b042d2d7c5456aead8975d7
Author: onders <on...@apache.org>
AuthorDate: Tue Jul 31 22:37:21 2018 +0300
CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
---
.../apache/camel/builder/ExpressionBuilder.java | 2 +-
.../org/apache/camel/impl/FileStateRepository.java | 2 +-
.../java/org/apache/camel/processor/Splitter.java | 2 -
.../idempotent/FileIdempotentRepository.java | 2 +-
.../camel/support/TokenPairExpressionIterator.java | 2 +-
.../support/TokenXMLPairExpressionIterator.java | 2 +-
.../org/apache/camel/util/GroupTokenIterator.java | 2 +-
.../apache/camel/util/IntrospectionSupport.java | 6 +-
.../main/java/org/apache/camel/util/Scanner.java | 10 +-
.../java/org/apache/camel/util/SkipIterator.java | 1 -
.../camel/processor/ThrottlingGroupingTest.java | 134 +++++++++++++++++++++
.../apache/camel/util/GroupTokenIteratorTest.java | 1 -
.../processor/SpringThrottlerGroupingTest.java | 2 -
.../spring/processor/ThrottlerGroupingTest.xml | 39 ++++++
14 files changed, 187 insertions(+), 20 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
index 11f640b..36c4ced 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.camel.util.Scanner;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +70,7 @@ import org.apache.camel.util.IOHelper;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.OgnlHelper;
+import org.apache.camel.util.Scanner;
import org.apache.camel.util.SkipIterator;
import org.apache.camel.util.StringHelper;
diff --git a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
index bac7f95..feda49c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
@@ -21,7 +21,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.camel.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.api.management.ManagedAttribute;
@@ -32,6 +31,7 @@ import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 6eb6833..1833cd5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.camel.util.Scanner;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
@@ -42,7 +41,6 @@ import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
-
import static org.apache.camel.util.ObjectHelper.notNull;
/**
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
index eef5c6d..c6758b9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.camel.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.api.management.ManagedAttribute;
@@ -35,6 +34,7 @@ import org.apache.camel.util.IOHelper;
import org.apache.camel.util.LRUCache;
import org.apache.camel.util.LRUCacheFactory;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
index 0ccfdd3..d9964b9 100644
--- a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
@@ -20,13 +20,13 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
-import org.apache.camel.util.Scanner;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
/**
* {@link org.apache.camel.Expression} to walk a {@link org.apache.camel.Message} body
diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
index 39664c9..23b3b2f 100644
--- a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
@@ -20,13 +20,13 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.camel.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
/**
* {@link org.apache.camel.Expression} to walk a {@link org.apache.camel.Message} XML body
diff --git a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
index 0fa0a5c..f6349a9 100644
--- a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
@@ -21,9 +21,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
-import org.apache.camel.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
diff --git a/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java b/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
index 008c6e1..f1738db 100644
--- a/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
@@ -44,9 +44,9 @@ import org.apache.camel.Component;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.properties.PropertiesComponent;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import static org.apache.camel.util.ObjectHelper.isAssignableFrom;
/**
@@ -133,8 +133,8 @@ public final class IntrospectionSupport {
*/
public static void stop() {
if (LOG.isDebugEnabled() && CACHE instanceof LRUCache) {
- LRUCache CACHE = (LRUCache) IntrospectionSupport.CACHE;
- LOG.debug("Clearing cache[size={}, hits={}, misses={}, evicted={}]", new Object[]{CACHE.size(), CACHE.getHits(), CACHE.getMisses(), CACHE.getEvicted()});
+ LRUCache localCache = (LRUCache) IntrospectionSupport.CACHE;
+ LOG.debug("Clearing cache[size={}, hits={}, misses={}, evicted={}]", new Object[]{localCache.size(), localCache.getHits(), localCache.getMisses(), localCache.getEvicted()});
}
CACHE.clear();
diff --git a/camel-core/src/main/java/org/apache/camel/util/Scanner.java b/camel-core/src/main/java/org/apache/camel/util/Scanner.java
index a042c57..43919eb 100644
--- a/camel-core/src/main/java/org/apache/camel/util/Scanner.java
+++ b/camel-core/src/main/java/org/apache/camel/util/Scanner.java
@@ -55,11 +55,11 @@ public final class Scanner implements Iterator<String>, Closeable {
private Matcher matcher;
private CharBuffer buf;
private int position;
- private boolean inputExhausted = false;
- private boolean needInput = false;
- private boolean skipped = false;
+ private boolean inputExhausted;
+ private boolean needInput;
+ private boolean skipped;
private int savedPosition = -1;
- private boolean closed = false;
+ private boolean closed;
private IOException lastIOException;
public Scanner(InputStream source, String charsetName, String pattern) {
@@ -194,7 +194,7 @@ public final class Scanner implements Iterator<String>, Closeable {
private void throwFor() {
skipped = false;
- if ((inputExhausted) && (position == buf.limit())) {
+ if (inputExhausted && position == buf.limit()) {
throw new NoSuchElementException();
} else {
throw new InputMismatchException();
diff --git a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
index 9c9b622..dc43715 100644
--- a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
@@ -19,7 +19,6 @@ package org.apache.camel.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.camel.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
index 09f1160..9a39781 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -18,14 +18,21 @@ package org.apache.camel.processor;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version
*/
public class ThrottlingGroupingTest extends ContextTestSupport {
+ private static final int INTERVAL = 500;
+ private static final int MESSAGE_COUNT = 9;
+ private static final int TOLERANCE = 50;
public void testGroupingWithSingleConstant() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
@@ -60,6 +67,127 @@ public class ThrottlingGroupingTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
+
+ public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() throws Exception {
+
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(3);
+ resultEndpoint.setResultWaitTime(2000);
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ for (int i = 0; i < 9; i++) {
+ if (i % 2 == 0) {
+ headers.put("key", "1");
+ } else {
+ headers.put("key", "2");
+ }
+ template.sendBodyAndHeaders("seda:ga", "<message>" + i + "</message>", headers);
+ }
+
+ // lets pause to give the requests time to be processed
+ // to check that the throttle really does kick in
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ private void assertThrottlerTiming(final long elapsedTimeMs, final int throttle, final int intervalMs, final int messageCount) {
+ // now assert that they have actually been throttled (use +/- 50 as slack)
+ long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 50;
+ long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 50;
+ // add 500 in case running on slow CI boxes
+ maximum += 500;
+ log.info("Sent {} exchanges in {}ms, with throttle rate of {} per {}ms. Calculated min {}ms and max {}ms", new Object[]{messageCount, elapsedTimeMs, throttle, intervalMs, minimum, maximum});
+
+ assertTrue("Should take at least " + minimum + "ms, was: " + elapsedTimeMs, elapsedTimeMs >= minimum);
+ assertTrue("Should take at most " + maximum + "ms, was: " + elapsedTimeMs, elapsedTimeMs <= maximum + TOLERANCE);
+ }
+
+ private long sendMessagesAndAwaitDelivery(final int messageCount, final String endpointUri, final int threadPoolSize, final MockEndpoint receivingEndpoint) throws InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
+ try {
+ if (receivingEndpoint != null) {
+ receivingEndpoint.expectedMessageCount(messageCount);
+ }
+
+ long start = System.nanoTime();
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ if (messageCount % 2 == 0) {
+ headers.put("key", "1");
+ } else {
+ headers.put("key", "2");
+ }
+ template.sendBodyAndHeaders(endpointUri, "<message>payload</message>", headers);
+ }
+ });
+ }
+
+ // let's wait for the exchanges to arrive
+ if (receivingEndpoint != null) {
+ receivingEndpoint.assertIsSatisfied();
+ }
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+ long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga", MESSAGE_COUNT, resultEndpoint);
+ assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+ }
+
+ public void testConfigurationWithHeaderExpression() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(MESSAGE_COUNT);
+
+ ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
+ try {
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ private long calculateMinimum(final long periodMs, final long throttleRate, final long messageCount) {
+ if (messageCount % throttleRate > 0) {
+ return (long) Math.floor((double)messageCount / (double)throttleRate) * periodMs;
+ } else {
+ return (long) (Math.floor((double)messageCount / (double)throttleRate) * periodMs) - periodMs;
+ }
+ }
+
+ private long calculateMaximum(final long periodMs, final long throttleRate, final long messageCount) {
+ return ((long)Math.ceil((double)messageCount / (double)throttleRate)) * periodMs;
+ }
+
+ private void sendMessagesWithHeaderExpression(final ExecutorService executor, final MockEndpoint resultEndpoint, final int throttle, final int intervalMs, final int messageCount)
+ throws InterruptedException {
+ resultEndpoint.expectedMessageCount(messageCount);
+
+ long start = System.nanoTime();
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("throttleValue", throttle);
+ if (messageCount % 2 == 0) {
+ headers.put("key", "1");
+ } else {
+ headers.put("key", "2");
+ }
+ template.sendBodyAndHeaders("direct:gexpressionHeader", "<message>payload</message>", headers);
+ }
+ });
+ }
+
+ // let's wait for the exchanges to arrive
+ resultEndpoint.assertIsSatisfied();
+ long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
+ }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -71,6 +199,12 @@ public class ThrottlingGroupingTest extends ContextTestSupport {
from("seda:a").throttle(header("max"), 1).to("mock:result");
from("seda:b").throttle(header("max"), 2).to("mock:result2");
from("seda:c").throttle(header("max"), header("key")).to("mock:resultdynamic");
+
+ from("seda:ga").throttle(constant(3), header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult");
+
+ from("direct:ga").throttle(constant(5), header("key")).timePeriodMillis(INTERVAL).to("log:gresult", "mock:gresult");
+
+ from("direct:gexpressionHeader").throttle(header("throttleValue"), header("key")).timePeriodMillis(INTERVAL).to("log:gresult", "mock:gresult");
}
};
}
diff --git a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
index dfff48c..a36bf1f 100644
--- a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.util;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-import org.apache.camel.util.Scanner;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
index e321838..7d82398 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
@@ -18,8 +18,6 @@ package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
import org.apache.camel.processor.ThrottlingGroupingTest;
-import org.junit.Ignore;
-
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest {
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
index c3019cd..d383782 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
@@ -60,6 +60,45 @@
<to uri="mock:resultdynamic"/>
</throttle>
</route>
+
+ <route errorHandlerRef="dlc">
+ <from uri="seda:ga"/>
+ <!-- throttle 3 messages per 1 sec -->
+ <throttle timePeriodMillis="1000">
+ <constant>3</constant>
+ <correlationExpression>
+ <header>key</header>
+ </correlationExpression>
+ <to uri="log:gresult"/>
+ <to uri="mock:gresult"/>
+ </throttle>
+ </route>
+
+ <route errorHandlerRef="dlc">
+ <from uri="direct:ga"/>
+ <!-- throttle 5 messages per 0.5 sec -->
+ <throttle timePeriodMillis="500">
+ <constant>5</constant>
+ <correlationExpression>
+ <header>key</header>
+ </correlationExpression>
+ <to uri="log:gresult"/>
+ <to uri="mock:gresult"/>
+ </throttle>
+ </route>
+
+ <route errorHandlerRef="dlc">
+ <from uri="direct:gexpressionHeader"/>
+ <throttle timePeriodMillis="500">
+ <!-- use a header to determine how many messages to throttle per 0.5 sec -->
+ <header>throttleValue</header>
+ <correlationExpression>
+ <header>key</header>
+ </correlationExpression>
+ <to uri="log:gresult"/>
+ <to uri="mock:gresult"/>
+ </throttle>
+ </route>
</camelContext>