You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/23 15:32:19 UTC

svn commit: r1187882 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/WrapProcessor.java main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java test/java/org/apache/camel/management/ManagedCustomPolicyTest.java

Author: davsclaus
Date: Sun Oct 23 13:32:18 2011
New Revision: 1187882

URL: http://svn.apache.org/viewvc?rev=1187882&view=rev
Log:
CAMEL-4536: Fixed issue with registering processors when using policy in a Camel route.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java
      - copied, changed from r1187862, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java?rev=1187882&r1=1187881&r2=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WrapProcessor.java Sun Oct 23 13:32:18 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.List;
+
 import org.apache.camel.Processor;
 import org.apache.camel.util.ServiceHelper;
 
@@ -38,6 +40,14 @@ public class WrapProcessor extends Deleg
     }
 
     @Override
+    public List<Processor> next() {
+        // must include wrapped in navigate
+        List<Processor> list = super.next();
+        list.add(wrapped);
+        return list;
+    }
+
+    @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(wrapped);
         super.doStart();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java?rev=1187882&r1=1187881&r2=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java Sun Oct 23 13:32:18 2011
@@ -41,9 +41,8 @@ public final class AsyncProcessorConvert
     }
 
     /**
-     * Creates a AsnycProcossor that delegates to the given processor.
-     * It is important that this implements DelegateProcessor
-     *
+     * Creates a {@link AsyncProcessor} that delegates to the given processor.
+     * It is important that this implements {@link DelegateProcessor}
      */
     private static final class ProcessorToAsyncProcessorBridge implements DelegateProcessor, AsyncProcessor, Navigate<Processor>, Service {
         protected Processor processor;

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java (from r1187862, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java&r1=1187862&r2=1187882&rev=1187882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedCustomPolicyTest.java Sun Oct 23 13:32:18 2011
@@ -17,45 +17,44 @@
 package org.apache.camel.management;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.camel.ServiceStatus;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
 
 /**
  * @version 
  */
-public class ManagedRoutePolicyTest extends ManagementTestSupport {
+public class ManagedCustomPolicyTest extends ManagementTestSupport {
+
+    private final AtomicInteger counter = new AtomicInteger();
+
+    public void testPolicy() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        template.sendBody("direct:start", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1, counter.get());
 
-    public void testRoutes() throws Exception {
         MBeanServer mbeanServer = getMBeanServer();
 
-        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null);
-        assertEquals(1, set.size());
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=processors,*"), null);
+        assertEquals(3, set.size());
 
-        ObjectName on = set.iterator().next();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"foo\"");
+        assertTrue("Should be registered: foo",  mbeanServer.isRegistered(on));
 
-        boolean registered = mbeanServer.isRegistered(on);
-        assertEquals("Should be registered", true, registered);
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"result\"");
+        assertTrue("Should be registered: result",  mbeanServer.isRegistered(on));
 
-        String uri = (String) mbeanServer.getAttribute(on, "EndpointUri");
-        // the route has this starting endpoint uri
-        assertEquals("direct://start", uri);
-
-        Integer val = (Integer) mbeanServer.getAttribute(on, "InflightExchanges");
-        // the route has no inflight exchanges
-        assertEquals(0, val.intValue());
-
-        // should be started
-        String state = (String) mbeanServer.getAttribute(on, "State");
-        assertEquals("Should be started", ServiceStatus.Started.name(), state);
-
-        // should have route policy
-        String policy = (String) mbeanServer.getAttribute(on, "RoutePolicyList");
-        assertNotNull(policy);
-        assertTrue("Should be a throttling, was: " + policy, policy.startsWith("ThrottlingInflightRoutePolicy"));
+        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"bar\"");
+        assertTrue("Should be registered: bar",  mbeanServer.isRegistered(on));
     }
 
     @Override
@@ -63,10 +62,35 @@ public class ManagedRoutePolicyTest exte
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").routePolicy(new ThrottlingInflightRoutePolicy())
-                    .to("log:foo").to("mock:result");
+                // custom policy but processors should be registered
+                from("direct:start").policy(new MyPolicy())
+                    .to("log:foo").id("foo")
+                    .to("mock:result").id("result");
+
+                // no policy but processors should be registered
+                from("direct:bar")
+                    .to("log:bar").id("bar");
             }
         };
     }
 
+    private final class MyPolicy implements Policy {
+
+        @Override
+        public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
+            // noop
+        }
+
+        @Override
+        public Processor wrap(RouteContext routeContext, final Processor processor) {
+            return new Processor() {
+                @Override
+                public void process(Exchange exchange) throws Exception {
+                    counter.incrementAndGet();
+                    processor.process(exchange);
+                }
+            };
+        }
+    }
+
 }