You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/11 10:43:21 UTC

svn commit: r1324638 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Wed Apr 11 08:43:21 2012
New Revision: 1324638

URL: http://svn.apache.org/viewvc?rev=1324638&view=rev
Log:
CAMEL-5126: Improved error message if invalid configuration of throttler EIP. CAMEL-5163: Fixed issue if throttler/delayer expression evalution threw exception, then error handler does not react.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java
      - copied, changed from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java Wed Apr 11 08:43:21 2012
@@ -129,7 +129,7 @@ public class ExpressionNode extends Proc
     @Override
     protected void preCreateProcessor() {
         Expression exp = expression;
-        if (expression.getExpressionValue() != null) {
+        if (expression != null && expression.getExpressionValue() != null) {
             exp = expression.getExpressionValue();
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Wed Apr 11 08:43:21 2012
@@ -88,7 +88,12 @@ public class ThrottleDefinition extends 
 
         // should be default 1000 millis
         long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
+
+        // max requests per period is mandatory
         Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
+        if (maxRequestsExpression == null) {
+            throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
+        }
 
         Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Apr 11 08:43:21 2012
@@ -95,11 +95,18 @@ public abstract class DelayProcessorSupp
         }
 
         // calculate delay and wait
-        long delay = calculateDelay(exchange);
-        if (delay <= 0) {
-            // no delay then continue routing
-            log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
-            return super.process(exchange, callback);
+        long delay;
+        try {
+            delay = calculateDelay(exchange);
+            if (delay <= 0) {
+                // no delay then continue routing
+                log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
+                return super.process(exchange, callback);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
         if (!isAsyncDelayed() || exchange.isTransacted()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=1324638&r1=1324637&r2=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Wed Apr 11 08:43:21 2012
@@ -22,6 +22,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.util.ObjectHelper;
 
@@ -37,7 +38,7 @@ import org.apache.camel.util.ObjectHelpe
  * @version 
  */
 public class Throttler extends DelayProcessorSupport implements Traceable {
-    private long maximumRequestsPerPeriod;
+    private volatile long maximumRequestsPerPeriod;
     private Expression maxRequestsPerPeriodExpression;
     private long timePeriodMillis = 1000;
     private volatile TimeSlot slot;
@@ -101,7 +102,14 @@ public class Throttler extends DelayProc
     // -----------------------------------------------------------------------
 
     protected long calculateDelay(Exchange exchange) {
-        Long longValue = maxRequestsPerPeriodExpression.evaluate(exchange, Long.class);
+        // evaluate as Object first to see if we get any result at all
+        Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class);
+        if (result == null) {
+            throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
+        }
+
+        // then must convert value to long
+        Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result);
         if (longValue != null) {
             // log if we changed max period after initial setting
             if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java (from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=1324578&r2=1324638&rev=1324638&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java Wed Apr 11 08:43:21 2012
@@ -16,146 +16,36 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.FailedToCreateRouteException;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.processor.Throttler.TimeSlot;
-
-import static org.apache.camel.builder.Builder.constant;
 
 /**
  * @version 
  */
-public class ThrottlerTest extends ContextTestSupport {
-    private static final int INTERVAL = 500;
-    protected int messageCount = 9;
-
-    public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setResultWaitTime(5000);
-
-        for (int i = 0; i < messageCount; i++) {
-            template.sendBody("seda:a", "<message>" + i + "</message>");
-        }
-
-        // lets pause to give the requests time to be processed
-        // to check that the throttle really does kick in
-        resultEndpoint.assertIsSatisfied();
-    }
-    
-    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(messageCount);
+public class ThrottlerInvalidConfiguredTest extends ContextTestSupport {
 
-        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < messageCount; i++) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    template.sendBody("direct:a", "<message>payload</message>");
-                }
-            });
+    public void testInvalid() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // null is invalid
+                from("seda:a").throttle(null).to("mock:result");
+            }
+        });
+        try {
+            context.start();
+            fail("Should have thrown exception");
+        } catch (FailedToCreateRouteException e) {
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertTrue(cause.getMessage().startsWith("MaxRequestsPerPeriod expression must be provided"));
         }
-
-        // let's wait for the exchanges to arrive
-        resultEndpoint.assertIsSatisfied();
-
-        // now assert that they have actually been throttled
-        long minimumTime = (messageCount - 1) * INTERVAL;
-        // add a little slack
-        long delta = System.currentTimeMillis() - start + 200;
-        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
-        executor.shutdownNow();
-    }
-
-    public void testTimeSlotCalculus() throws Exception {
-        Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false);
-        // calculate will assign a new slot
-        throttler.calculateDelay(new DefaultExchange(context));
-        TimeSlot slot = throttler.nextSlot();
-        // start a new time slot
-        assertNotNull(slot);
-        // make sure the same slot is used (3 exchanges per slot)
-        assertSame(slot, throttler.nextSlot());
-        assertTrue(slot.isFull());
-        
-        TimeSlot next = throttler.nextSlot();
-        // now we should have a new slot that starts somewhere in the future
-        assertNotSame(slot, next);
-        assertFalse(next.isActive());
+        context.stop();
     }
 
-    public void testConfigurationWithConstantExpression() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(messageCount);
-
-        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < messageCount; i++) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    template.sendBody("direct:expressionConstant", "<message>payload</message>");
-                }
-            });
-        }
-
-        // let's wait for the exchanges to arrive
-        resultEndpoint.assertIsSatisfied();
-
-        // now assert that they have actually been throttled
-        long minimumTime = (messageCount - 1) * INTERVAL;
-        // add a little slack
-        long delta = System.currentTimeMillis() - start + 200;
-        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
-        executor.shutdownNow();
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
     }
 
-    public void testConfigurationWithHeaderExpression() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(messageCount);
-
-        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
-
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < messageCount; i++) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue", 1);
-                }
-            });
-        }
-
-        // let's wait for the exchanges to arrive
-        resultEndpoint.assertIsSatisfied();
-
-        // now assert that they have actually been throttled
-        long minimumTime = (messageCount - 1) * INTERVAL;
-        // add a little slack
-        long delta = System.currentTimeMillis() - start + 200;
-        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
-        executor.shutdownNow();
-    }
-    
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                // START SNIPPET: ex
-                from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
-                // END SNIPPET: ex
-                
-                from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-                
-                from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-                
-                from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
-            }
-        };
-    }
 }
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java?rev=1324638&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java Wed Apr 11 08:43:21 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ThrottlerNullEvalTest extends ContextTestSupport {
+
+    public void testNullEvalTest() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNoHeaderTest() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+        template.sendBody("seda:a", "Kaboom");
+        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("seda:a").throttle(header("max")).to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file