You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/26 11:35:52 UTC

svn commit: r1139749 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Sun Jun 26 09:35:51 2011
New Revision: 1139749

URL: http://svn.apache.org/viewvc?rev=1139749&view=rev
Log:
CAMEL-4149: Fixed throttling route policy in context scope may not resume route when hitting low watermark

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1139749&r1=1139748&r2=1139749&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Sun Jun 26 09:35:51 2011
@@ -155,16 +155,18 @@ public class DefaultRouteContext impleme
             RoutePolicyProcessor routePolicyProcessor = null;
             List<RoutePolicy> routePolicyList = getRoutePolicyList();
             if (routePolicyList != null && !routePolicyList.isEmpty()) {
-                routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
-
-                // add it as service if we have not already done that (eg possible if two routes have the same service)
-                if (!camelContext.hasService(routePolicyProcessor)) {
-                    try {
-                        camelContext.addService(routePolicyProcessor);
-                    } catch (Exception e) {
-                        throw ObjectHelper.wrapRuntimeCamelException(e);
+                for (RoutePolicy policy : routePolicyList) {
+                    // add policy as service if we have not already done that (eg possible if two routes have the same service)
+                    // this ensures Camel can control the lifecycle of the policy
+                    if (!camelContext.hasService(policy)) {
+                        try {
+                            camelContext.addService(policy);
+                        } catch (Exception e) {
+                            throw ObjectHelper.wrapRuntimeCamelException(e);
+                        }
                     }
                 }
+                routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
                 target = routePolicyProcessor;
             } else {
                 target = unitOfWorkProcessor;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1139749&r1=1139748&r2=1139749&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java Sun Jun 26 09:35:51 2011
@@ -16,29 +16,48 @@
  */
 package org.apache.camel.impl;
 
+import java.util.EventObject;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Route;
+import org.apache.camel.management.EventNotifierSupport;
+import org.apache.camel.management.event.ExchangeCompletedEvent;
 import org.apache.camel.processor.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.LoggerFactory;
 
 /**
  * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
  * throttling a route based on number of current inflight exchanges.
+ * <p/>
+ * This implementation supports two scopes {@link ThrottlingScope#Context} and {@link ThrottlingScope#Route} (is default).
+ * If context scope is selected then this implementation will use a {@link org.apache.camel.spi.EventNotifier} to listen
+ * for events when {@link Exchange}s is done, and trigger the {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)}
+ * method. If the route scope is selected then <b>no</b> {@link org.apache.camel.spi.EventNotifier} is in use, as there is already
+ * a {@link org.apache.camel.spi.Synchronization} callback on the current {@link Exchange} which triggers the
+ * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when the current {@link Exchange} is done.
  *
  * @version 
  */
-public class ThrottlingInflightRoutePolicy extends RoutePolicySupport {
+public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements CamelContextAware {
 
     public enum ThrottlingScope {
         Context, Route
     }
 
+    private final Set<Route> routes = new LinkedHashSet<Route>();
+    private ContextScopedEventNotifier eventNotifier;
+    private CamelContext camelContext;
     private final Lock lock = new ReentrantLock();
     private ThrottlingScope scope = ThrottlingScope.Route;
     private int maxInflightExchanges = 1000;
@@ -55,12 +74,45 @@ public class ThrottlingInflightRoutePoli
         return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " + resumePercentOfMax + "% using scope " + scope + "]";
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        // we need to remember the routes we apply for
+        routes.add(route);
+    }
+
+    @Override
     public void onExchangeDone(Route route, Exchange exchange) {
+        // if route scoped then throttle directly
+        // as context scoped is handled using an EventNotifier instead
+        if (scope == ThrottlingScope.Route) {
+            throttle(route, exchange);
+        }
+    }
+
+    /**
+     * Throttles the route when {@link Exchange}s is done.
+     *
+     * @param route  the route
+     * @param exchange the exchange
+     */
+    protected void throttle(Route route, Exchange exchange) {
         // this works the best when this logic is executed when the exchange is done
         Consumer consumer = route.getConsumer();
 
         int size = getSize(consumer, exchange);
-        if (maxInflightExchanges > 0 && size > maxInflightExchanges) {
+        boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
+        if (log.isTraceEnabled()) {
+            log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
+        }
+        if (stop) {
             try {
                 lock.lock();
                 stopConsumer(size, consumer);
@@ -74,7 +126,11 @@ public class ThrottlingInflightRoutePoli
         // reload size in case a race condition with too many at once being invoked
         // so we need to ensure that we read the most current size and start the consumer if we are already to low
         size = getSize(consumer, exchange);
-        if (size <= resumeInflightExchanges) {
+        boolean start = size <= resumeInflightExchanges;
+        if (log.isTraceEnabled()) {
+            log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start});
+        }
+        if (start) {
             try {
                 lock.lock();
                 startConsumer(size, consumer);
@@ -185,16 +241,73 @@ public class ThrottlingInflightRoutePoli
     private void startConsumer(int size, Consumer consumer) throws Exception {
         boolean started = super.startConsumer(consumer);
         if (started) {
-            getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer.");
+            getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer);
         }
     }
 
     private void stopConsumer(int size, Consumer consumer) throws Exception {
         boolean stopped = super.stopConsumer(consumer);
         if (stopped) {
-            getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer.");
+            getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (scope == ThrottlingScope.Context) {
+            eventNotifier = new ContextScopedEventNotifier();
+            // must start the notifier before it can be used
+            ServiceHelper.startService(eventNotifier);
+            // we are in context scope, so we need to use an event notifier to keep track
+            // when any exchanges is done on the camel context.
+            // This ensures we can trigger accordingly to context scope
+            camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (scope == ThrottlingScope.Context) {
+            camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
         }
     }
 
+    /**
+     * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link Exchange}
+     * is done, so we can throttle accordingly.
+     */
+    private class ContextScopedEventNotifier extends EventNotifierSupport {
+
+        @Override
+        public void notify(EventObject event) throws Exception {
+            // if context
+            ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event;
+            for (Route route : routes) {
+                throttle(route, completedEvent.getExchange());
+            }
+        }
+
+        @Override
+        public boolean isEnabled(EventObject event) {
+            return event instanceof ExchangeCompletedEvent;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            // noop
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            // noop
+        }
+
+        @Override
+        public String toString() {
+            return "ContextScopedEventNotifier";
+        }
+    }
 
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java?rev=1139749&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java Sun Jun 26 09:35:51 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+
+/**
+ * @version 
+ */
+public class ThrottlingInflightRoutePolicyContextScopeTest extends ContextTestSupport {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    public void testThrottlingRoutePolicy() throws Exception {
+        // trigger one in flight from the start
+        template.sendBody("seda:bar", "Hello World");
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("A");
+        result.setMinimumResultWaitTime(1000);
+
+        // only 1 message will get completed as the throttler will suspend the consumer
+        // when A is done
+        template.sendBody("direct:start", "A");
+
+        // need a little slack to ensure the seda consumer will be suspended in between
+        Thread.sleep(2000);
+        template.sendBody("direct:start", "B");
+
+        result.assertIsSatisfied();
+
+        result.reset();
+        result.expectedBodiesReceived("B");
+
+        // trigger seda:bar to complete now, which should signal
+        // to the throttler to resume the seda:foo consumer, so B can get done
+        latch.countDown();
+
+        result.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
+                policy.setMaxInflightExchanges(1);
+                policy.setScope(ThrottlingInflightRoutePolicy.ThrottlingScope.Context);
+
+                from("seda:bar")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            latch.await();
+                        }
+                    }).to("mock:bar");
+
+                from("direct:start")
+                    .to("seda:foo");
+
+                from("seda:foo")
+                    .routePolicy(policy)
+                    .to("log:foo")
+                    .to("mock:result");
+            }
+        };
+    }
+}