You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/23 15:32:19 UTC
svn commit: r1187882 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/WrapProcessor.java
main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
test/java/org/apache/camel/management/ManagedCustomPolicyTest.java
Author: davsclaus
Date: Sun Oct 23 13:32:18 2011
New Revision: 1187882
URL: http://svn.apache.org/viewvc?rev=1187882&view=rev
Log:
CAMEL-4536: Fixed issue with registering processors when using policy in a Camel route.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java
- copied, changed from r1187862, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java?rev=1187882&r1=1187881&r2=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java Sun Oct 23 13:32:18 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.List;
+
import org.apache.camel.Processor;
import org.apache.camel.util.ServiceHelper;
@@ -38,6 +40,14 @@ public class WrapProcessor extends Deleg
}
@Override
+ public List<Processor> next() {
+ // must include wrapped in navigate
+ List<Processor> list = super.next();
+ list.add(wrapped);
+ return list;
+ }
+
+ @Override
protected void doStart() throws Exception {
ServiceHelper.startService(wrapped);
super.doStart();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java?rev=1187882&r1=1187881&r2=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java Sun Oct 23 13:32:18 2011
@@ -41,9 +41,8 @@ public final class AsyncProcessorConvert
}
/**
- * Creates a AsnycProcossor that delegates to the given processor.
- * It is important that this implements DelegateProcessor
- *
+ * Creates a {@link AsyncProcessor} that delegates to the given processor.
+ * It is important that this implements {@link DelegateProcessor}
*/
private static final class ProcessorToAsyncProcessorBridge implements DelegateProcessor, AsyncProcessor, Navigate<Processor>, Service {
protected Processor processor;
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java (from r1187862, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java&r1=1187862&r2=1187882&rev=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java Sun Oct 23 13:32:18 2011
@@ -17,45 +17,44 @@
package org.apache.camel.management;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.camel.ServiceStatus;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
/**
* @version
*/
-public class ManagedRoutePolicyTest extends ManagementTestSupport {
+public class ManagedCustomPolicyTest extends ManagementTestSupport {
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ public void testPolicy() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+ template.sendBody("direct:start", "Hello World");
+ assertMockEndpointsSatisfied();
+
+ assertEquals(1, counter.get());
- public void testRoutes() throws Exception {
MBeanServer mbeanServer = getMBeanServer();
- Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
- assertEquals(1, set.size());
+ Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=processors,*"), null);
+ assertEquals(3, set.size());
- ObjectName on = set.iterator().next();
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"foo\"");
+ assertTrue("Should be registered: foo", mbeanServer.isRegistered(on));
- boolean registered = mbeanServer.isRegistered(on);
- assertEquals("Should be registered", true, registered);
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"result\"");
+ assertTrue("Should be registered: result", mbeanServer.isRegistered(on));
- String uri = (String) mbeanServer.getAttribute(on, "EndpointUri");
- // the route has this starting endpoint uri
- assertEquals("direct://start", uri);
-
- Integer val = (Integer) mbeanServer.getAttribute(on, "InflightExchanges");
- // the route has no inflight exchanges
- assertEquals(0, val.intValue());
-
- // should be started
- String state = (String) mbeanServer.getAttribute(on, "State");
- assertEquals("Should be started", ServiceStatus.Started.name(), state);
-
- // should have route policy
- String policy = (String) mbeanServer.getAttribute(on, "RoutePolicyList");
- assertNotNull(policy);
- assertTrue("Should be a throttling, was: " + policy, policy.startsWith("ThrottlingInflightRoutePolicy"));
+ on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"bar\"");
+ assertTrue("Should be registered: bar", mbeanServer.isRegistered(on));
}
@Override
@@ -63,10 +62,35 @@ public class ManagedRoutePolicyTest exte
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").routePolicy(new ThrottlingInflightRoutePolicy())
- .to("log:foo").to("mock:result");
+ // custom policy but processors should be registered
+ from("direct:start").policy(new MyPolicy())
+ .to("log:foo").id("foo")
+ .to("mock:result").id("result");
+
+ // no policy but processors should be registered
+ from("direct:bar")
+ .to("log:bar").id("bar");
}
};
}
+ private final class MyPolicy implements Policy {
+
+ @Override
+ public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
+ // noop
+ }
+
+ @Override
+ public Processor wrap(RouteContext routeContext, final Processor processor) {
+ return new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ counter.incrementAndGet();
+ processor.process(exchange);
+ }
+ };
+ }
+ }
+
}