You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/11 10:43:21 UTC
svn commit: r1324638 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Wed Apr 11 08:43:21 2012
New Revision: 1324638
URL: http://svn.apache.org/viewvc?rev=1324638&view=rev
Log:
CAMEL-5126: Improved error message if invalid configuration of throttler EIP. CAMEL-5163: Fixed issue if throttler/delayer expression evalution threw exception, then error handler does not react.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java
- copied, changed from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java Wed Apr 11 08:43:21 2012
@@ -129,7 +129,7 @@ public class ExpressionNode extends Proc
@Override
protected void preCreateProcessor() {
Expression exp = expression;
- if (expression.getExpressionValue() != null) {
+ if (expression != null && expression.getExpressionValue() != null) {
exp = expression.getExpressionValue();
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Wed Apr 11 08:43:21 2012
@@ -88,7 +88,12 @@ public class ThrottleDefinition extends
// should be default 1000 millis
long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
+
+ // max requests per period is mandatory
Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
+ if (maxRequestsExpression == null) {
+ throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
+ }
Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Apr 11 08:43:21 2012
@@ -95,11 +95,18 @@ public abstract class DelayProcessorSupp
}
// calculate delay and wait
- long delay = calculateDelay(exchange);
- if (delay <= 0) {
- // no delay then continue routing
- log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
- return super.process(exchange, callback);
+ long delay;
+ try {
+ delay = calculateDelay(exchange);
+ if (delay <= 0) {
+ // no delay then continue routing
+ log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+ return super.process(exchange, callback);
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
if (!isAsyncDelayed() || exchange.isTransacted()) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Wed Apr 11 08:43:21 2012
@@ -22,6 +22,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.util.ObjectHelper;
@@ -37,7 +38,7 @@ import org.apache.camel.util.ObjectHelpe
* @version
*/
public class Throttler extends DelayProcessorSupport implements Traceable {
- private long maximumRequestsPerPeriod;
+ private volatile long maximumRequestsPerPeriod;
private Expression maxRequestsPerPeriodExpression;
private long timePeriodMillis = 1000;
private volatile TimeSlot slot;
@@ -101,7 +102,14 @@ public class Throttler extends DelayProc
// -----------------------------------------------------------------------
protected long calculateDelay(Exchange exchange) {
- Long longValue = maxRequestsPerPeriodExpression.evaluate(exchange, Long.class);
+ // evaluate as Object first to see if we get any result at all
+ Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class);
+ if (result == null) {
+ throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
+ }
+
+ // then must convert value to long
+ Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result);
if (longValue != null) {
// log if we changed max period after initial setting
if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java (from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=1324578&r2=1324638&rev=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java Wed Apr 11 08:43:21 2012
@@ -16,146 +16,36 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.FailedToCreateRouteException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.processor.Throttler.TimeSlot;
-
-import static org.apache.camel.builder.Builder.constant;
/**
* @version
*/
-public class ThrottlerTest extends ContextTestSupport {
- private static final int INTERVAL = 500;
- protected int messageCount = 9;
-
- public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(5000);
-
- for (int i = 0; i < messageCount; i++) {
- template.sendBody("seda:a", "<message>" + i + "</message>");
- }
-
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
- resultEndpoint.assertIsSatisfied();
- }
-
- public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(messageCount);
+public class ThrottlerInvalidConfiguredTest extends ContextTestSupport {
- ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBody("direct:a", "<message>payload</message>");
- }
- });
+ public void testInvalid() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // null is invalid
+ from("seda:a").throttle(null).to("mock:result");
+ }
+ });
+ try {
+ context.start();
+ fail("Should have thrown exception");
+ } catch (FailedToCreateRouteException e) {
+ IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertTrue(cause.getMessage().startsWith("MaxRequestsPerPeriod expression must be provided"));
}
-
- // let's wait for the exchanges to arrive
- resultEndpoint.assertIsSatisfied();
-
- // now assert that they have actually been throttled
- long minimumTime = (messageCount - 1) * INTERVAL;
- // add a little slack
- long delta = System.currentTimeMillis() - start + 200;
- assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
- executor.shutdownNow();
- }
-
- public void testTimeSlotCalculus() throws Exception {
- Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
- // calculate will assign a new slot
- throttler.calculateDelay(new DefaultExchange(context));
- TimeSlot slot = throttler.nextSlot();
- // start a new time slot
- assertNotNull(slot);
- // make sure the same slot is used (3 exchanges per slot)
- assertSame(slot, throttler.nextSlot());
- assertTrue(slot.isFull());
-
- TimeSlot next = throttler.nextSlot();
- // now we should have a new slot that starts somewhere in the future
- assertNotSame(slot, next);
- assertFalse(next.isActive());
+ context.stop();
}
- public void testConfigurationWithConstantExpression() throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(messageCount);
-
- ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBody("direct:expressionConstant", "<message>payload</message>");
- }
- });
- }
-
- // let's wait for the exchanges to arrive
- resultEndpoint.assertIsSatisfied();
-
- // now assert that they have actually been throttled
- long minimumTime = (messageCount - 1) * INTERVAL;
- // add a little slack
- long delta = System.currentTimeMillis() - start + 200;
- assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
- executor.shutdownNow();
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
}
- public void testConfigurationWithHeaderExpression() throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(messageCount);
-
- ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue", 1);
- }
- });
- }
-
- // let's wait for the exchanges to arrive
- resultEndpoint.assertIsSatisfied();
-
- // now assert that they have actually been throttled
- long minimumTime = (messageCount - 1) * INTERVAL;
- // add a little slack
- long delta = System.currentTimeMillis() - start + 200;
- assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
- executor.shutdownNow();
- }
-
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
- public void configure() {
- // START SNIPPET: ex
- from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
- // END SNIPPET: ex
-
- from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-
- from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-
- from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
- }
- };
- }
}
\ No newline at end of file
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java?rev=1324638&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java Wed Apr 11 08:43:21 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ThrottlerNullEvalTest extends ContextTestSupport {
+
+ public void testNullEvalTest() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+ template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+ template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+ template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testNoHeaderTest() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+ template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+ template.sendBody("seda:a", "Kaboom");
+ template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("seda:a").throttle(header("max")).to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file