You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/26 11:35:52 UTC
svn commit: r1139749 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/
Author: davsclaus
Date: Sun Jun 26 09:35:51 2011
New Revision: 1139749
URL: http://svn.apache.org/viewvc?rev=1139749&view=rev
Log:
CAMEL-4149: Fixed throttling route policy in context scope may not resume route when hitting low watermark
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=1139749&r1=1139748&r2=1139749&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Sun Jun 26 09:35:51 2011
@@ -155,16 +155,18 @@ public class DefaultRouteContext impleme
RoutePolicyProcessor routePolicyProcessor = null;
List<RoutePolicy> routePolicyList = getRoutePolicyList();
if (routePolicyList != null && !routePolicyList.isEmpty()) {
- routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
-
- // add it as service if we have not already done that (eg possible if two routes have the same service)
- if (!camelContext.hasService(routePolicyProcessor)) {
- try {
- camelContext.addService(routePolicyProcessor);
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
+ for (RoutePolicy policy : routePolicyList) {
+ // add policy as service if we have not already done that (eg possible if two routes have the same service)
+ // this ensures Camel can control the lifecycle of the policy
+ if (!camelContext.hasService(policy)) {
+ try {
+ camelContext.addService(policy);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
}
}
+ routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
target = routePolicyProcessor;
} else {
target = unitOfWorkProcessor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java?rev=1139749&r1=1139748&r2=1139749&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java Sun Jun 26 09:35:51 2011
@@ -16,29 +16,48 @@
*/
package org.apache.camel.impl;
+import java.util.EventObject;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Route;
+import org.apache.camel.management.EventNotifierSupport;
+import org.apache.camel.management.event.ExchangeCompletedEvent;
import org.apache.camel.processor.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.LoggerFactory;
/**
* A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
* throttling a route based on number of current inflight exchanges.
+ * <p/>
+ * This implementation supports two scopes {@link ThrottlingScope#Context} and {@link ThrottlingScope#Route} (is default).
+ * If context scope is selected then this implementation will use a {@link org.apache.camel.spi.EventNotifier} to listen
+ * for events when {@link Exchange}s is done, and trigger the {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)}
+ * method. If the route scope is selected then <b>no</b> {@link org.apache.camel.spi.EventNotifier} is in use, as there is already
+ * a {@link org.apache.camel.spi.Synchronization} callback on the current {@link Exchange} which triggers the
+ * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when the current {@link Exchange} is done.
*
* @version
*/
-public class ThrottlingInflightRoutePolicy extends RoutePolicySupport {
+public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements CamelContextAware {
public enum ThrottlingScope {
Context, Route
}
+ private final Set<Route> routes = new LinkedHashSet<Route>();
+ private ContextScopedEventNotifier eventNotifier;
+ private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
private ThrottlingScope scope = ThrottlingScope.Route;
private int maxInflightExchanges = 1000;
@@ -55,12 +74,45 @@ public class ThrottlingInflightRoutePoli
return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " + resumePercentOfMax + "% using scope " + scope + "]";
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ // we need to remember the routes we apply for
+ routes.add(route);
+ }
+
+ @Override
public void onExchangeDone(Route route, Exchange exchange) {
+ // if route scoped then throttle directly
+ // as context scoped is handled using an EventNotifier instead
+ if (scope == ThrottlingScope.Route) {
+ throttle(route, exchange);
+ }
+ }
+
+ /**
+ * Throttles the route when {@link Exchange}s is done.
+ *
+ * @param route the route
+ * @param exchange the exchange
+ */
+ protected void throttle(Route route, Exchange exchange) {
// this works the best when this logic is executed when the exchange is done
Consumer consumer = route.getConsumer();
int size = getSize(consumer, exchange);
- if (maxInflightExchanges > 0 && size > maxInflightExchanges) {
+ boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
+ if (log.isTraceEnabled()) {
+ log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
+ }
+ if (stop) {
try {
lock.lock();
stopConsumer(size, consumer);
@@ -74,7 +126,11 @@ public class ThrottlingInflightRoutePoli
// reload size in case a race condition with too many at once being invoked
// so we need to ensure that we read the most current size and start the consumer if we are already to low
size = getSize(consumer, exchange);
- if (size <= resumeInflightExchanges) {
+ boolean start = size <= resumeInflightExchanges;
+ if (log.isTraceEnabled()) {
+ log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start});
+ }
+ if (start) {
try {
lock.lock();
startConsumer(size, consumer);
@@ -185,16 +241,73 @@ public class ThrottlingInflightRoutePoli
private void startConsumer(int size, Consumer consumer) throws Exception {
boolean started = super.startConsumer(consumer);
if (started) {
- getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer.");
+ getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer);
}
}
private void stopConsumer(int size, Consumer consumer) throws Exception {
boolean stopped = super.stopConsumer(consumer);
if (stopped) {
- getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer.");
+ getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
+ if (scope == ThrottlingScope.Context) {
+ eventNotifier = new ContextScopedEventNotifier();
+ // must start the notifier before it can be used
+ ServiceHelper.startService(eventNotifier);
+ // we are in context scope, so we need to use an event notifier to keep track
+ // when any exchanges is done on the camel context.
+ // This ensures we can trigger accordingly to context scope
+ camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
+ if (scope == ThrottlingScope.Context) {
+ camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
}
}
+ /**
+ * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link Exchange}
+ * is done, so we can throttle accordingly.
+ */
+ private class ContextScopedEventNotifier extends EventNotifierSupport {
+
+ @Override
+ public void notify(EventObject event) throws Exception {
+ // if context
+ ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event;
+ for (Route route : routes) {
+ throttle(route, completedEvent.getExchange());
+ }
+ }
+
+ @Override
+ public boolean isEnabled(EventObject event) {
+ return event instanceof ExchangeCompletedEvent;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+ @Override
+ public String toString() {
+ return "ContextScopedEventNotifier";
+ }
+ }
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java?rev=1139749&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyContextScopeTest.java Sun Jun 26 09:35:51 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+
+/**
+ * @version
+ */
+public class ThrottlingInflightRoutePolicyContextScopeTest extends ContextTestSupport {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ public void testThrottlingRoutePolicy() throws Exception {
+ // trigger one in flight from the start
+ template.sendBody("seda:bar", "Hello World");
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("A");
+ result.setMinimumResultWaitTime(1000);
+
+ // only 1 message will get completed as the throttler will suspend the consumer
+ // when A is done
+ template.sendBody("direct:start", "A");
+
+ // need a little slack to ensure the seda consumer will be suspended in between
+ Thread.sleep(2000);
+ template.sendBody("direct:start", "B");
+
+ result.assertIsSatisfied();
+
+ result.reset();
+ result.expectedBodiesReceived("B");
+
+ // trigger seda:bar to complete now, which should signal
+ // to the throttler to resume the seda:foo consumer, so B can get done
+ latch.countDown();
+
+ result.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
+ policy.setMaxInflightExchanges(1);
+ policy.setScope(ThrottlingInflightRoutePolicy.ThrottlingScope.Context);
+
+ from("seda:bar")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ latch.await();
+ }
+ }).to("mock:bar");
+
+ from("direct:start")
+ .to("seda:foo");
+
+ from("seda:foo")
+ .routePolicy(policy)
+ .to("log:foo")
+ .to("mock:result");
+ }
+ };
+ }
+}