You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/03 12:05:19 UTC

svn commit: r1405320 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/controlbus/ test/java/org/apache/camel/component/controlbus/

Author: davsclaus
Date: Sat Nov  3 11:05:18 2012
New Revision: 1405320

URL: http://svn.apache.org/viewvc?rev=1405320&view=rev
Log:
CAMEL-5651: More stuff to control bus component.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java
      - copied, changed from r1405304, camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java Sat Nov  3 11:05:18 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.controlbus;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
@@ -26,10 +27,12 @@ import org.apache.camel.impl.DefaultComp
  */
 public class ControlBusComponent extends DefaultComponent {
 
-    // TODO: allow to use a thread pool for tasks so you dont have to wait
     // TODO: management command, to use the JMX mbeans easier
     // TODO: Bulk status in POJO / JSON format
     // TODO: a header with the action to do instead of uri, as we may want to be lenient
+    // TODO: JMX stats and operations of in-flight tasks, and history of done etc
+
+    private ExecutorService executorService;
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -48,4 +51,20 @@ public class ControlBusComponent extends
         setProperties(answer, parameters);
         return answer;
     }
+
+    synchronized ExecutorService getExecutorService() {
+        if (executorService == null) {
+            executorService = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "ControlBus");
+        }
+        return executorService;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+        super.doStop();
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java Sat Nov  3 11:05:18 2012
@@ -18,11 +18,13 @@ package org.apache.camel.component.contr
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Language;
+import org.apache.camel.util.CamelLogger;
 
 /**
  * The control bus endpoint.
@@ -32,6 +34,8 @@ public class ControlBusEndpoint extends 
     private Language language;
     private String routeId;
     private String action;
+    private boolean async;
+    private LoggingLevel loggingLevel = LoggingLevel.INFO;
 
     public ControlBusEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
@@ -39,7 +43,8 @@ public class ControlBusEndpoint extends 
 
     @Override
     public Producer createProducer() throws Exception {
-        return new ControlBusProducer(this);
+        CamelLogger logger = new CamelLogger(ControlBusProducer.class.getName(), loggingLevel);
+        return new ControlBusProducer(this, logger);
     }
 
     @Override
@@ -53,6 +58,11 @@ public class ControlBusEndpoint extends 
         return false;
     }
 
+    @Override
+    public ControlBusComponent getComponent() {
+        return (ControlBusComponent) super.getComponent();
+    }
+
     public Language getLanguage() {
         return language;
     }
@@ -76,4 +86,20 @@ public class ControlBusEndpoint extends 
     public void setAction(String action) {
         this.action = action;
     }
+
+    public boolean isAsync() {
+        return async;
+    }
+
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java Sat Nov  3 11:05:18 2012
@@ -23,6 +23,7 @@ import org.apache.camel.Expression;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.Language;
+import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 
 /**
@@ -30,8 +31,11 @@ import org.apache.camel.util.ExchangeHel
  */
 public class ControlBusProducer extends DefaultAsyncProducer {
 
-    public ControlBusProducer(Endpoint endpoint) {
+    private final CamelLogger logger;
+
+    public ControlBusProducer(Endpoint endpoint, CamelLogger logger) {
         super(endpoint);
+        this.logger = logger;
     }
 
     @Override
@@ -49,7 +53,7 @@ public class ControlBusProducer extends 
             }
         } else if (getEndpoint().getAction() != null) {
             try {
-                processByAction(exchange, getEndpoint().getRouteId(), getEndpoint().getAction());
+                processByAction(exchange);
             } catch (Exception e) {
                 exchange.setException(e);
             }
@@ -60,28 +64,104 @@ public class ControlBusProducer extends 
     }
 
     protected void processByLanguage(Exchange exchange, Language language) throws Exception {
-        // create dummy exchange
-        Exchange dummy = ExchangeHelper.createCopy(exchange, true);
+        LanguageTask task = new LanguageTask(exchange, language);
+        if (getEndpoint().isAsync()) {
+            getEndpoint().getComponent().getExecutorService().submit(task);
+        } else {
+            task.run();
+        }
+    }
+
+    protected void processByAction(Exchange exchange) throws Exception {
+        ActionTask task = new ActionTask(exchange);
+        if (getEndpoint().isAsync()) {
+            getEndpoint().getComponent().getExecutorService().submit(task);
+        } else {
+            task.run();
+        }
+    }
+
+    /**
+     * Tasks to run when processing by language.
+     */
+    private final class LanguageTask implements Runnable {
+
+        private final Exchange exchange;
+        private final Language language;
+
+        private LanguageTask(Exchange exchange, Language language) {
+            this.exchange = exchange;
+            this.language = language;
+        }
+
+        @Override
+        public void run() {
+            String task = null;
+            Object result = null;
+
+            try {
+                // create dummy exchange
+                Exchange dummy = ExchangeHelper.createCopy(exchange, true);
 
-        String body = dummy.getIn().getMandatoryBody(String.class);
-        if (body != null) {
-            Expression exp = language.createExpression(body);
-            Object out = exp.evaluate(dummy, Object.class);
-            if (out != null) {
-                exchange.getIn().setBody(out);
+                task = dummy.getIn().getMandatoryBody(String.class);
+                if (task != null) {
+                    Expression exp = language.createExpression(task);
+                    result = exp.evaluate(dummy, Object.class);
+                }
+
+                if (result != null && !getEndpoint().isAsync()) {
+                    // can only set result on exchange if sync
+                    exchange.getIn().setBody(result);
+                }
+
+                if (task != null) {
+                    logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
+                }
+            } catch (Exception e) {
+                logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
             }
         }
     }
 
-    protected void processByAction(Exchange exchange, String id, String action) throws Exception {
-        if ("start".equals(action)) {
-            getEndpoint().getCamelContext().startRoute(id);
-        } else if ("stop".equals(action)) {
-            getEndpoint().getCamelContext().stopRoute(id);
-        } else if ("status".equals(action)) {
-            ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id);
-            if (status != null) {
-                exchange.getIn().setBody(status.name());
+    /**
+     * Tasks to run when processing by route action.
+     */
+    private final class ActionTask implements Runnable {
+
+        private final Exchange exchange;
+
+        private ActionTask(Exchange exchange) {
+            this.exchange = exchange;
+        }
+
+        @Override
+        public void run() {
+            String action = getEndpoint().getAction();
+            String id = getEndpoint().getRouteId();
+
+            Object result = null;
+            String task = action + " route " + id;
+
+            try {
+                if ("start".equals(action)) {
+                    getEndpoint().getCamelContext().startRoute(id);
+                } else if ("stop".equals(action)) {
+                    getEndpoint().getCamelContext().stopRoute(id);
+                } else if ("status".equals(action)) {
+                    ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id);
+                    if (status != null) {
+                        result = status.name();
+                    }
+                }
+
+                if (result != null && !getEndpoint().isAsync()) {
+                    // can only set result on exchange if sync
+                    exchange.getIn().setBody(result);
+                }
+
+                logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
+            } catch (Exception e) {
+                logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
             }
         }
     }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java (from r1405304, camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java&r1=1405304&r2=1405320&rev=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java Sat Nov  3 11:05:18 2012
@@ -22,9 +22,9 @@ import org.apache.camel.builder.RouteBui
 /**
  *
  */
-public class ControlBusStartRouteTest extends ContextTestSupport {
+public class ControlBusStartRouteAsyncTest extends ContextTestSupport {
 
-    public void testControlBusStartStop() throws Exception {
+    public void testControlBusAsync() throws Exception {
         assertEquals("Stopped", context.getRouteStatus("foo").name());
 
         // store a pending message
@@ -32,26 +32,13 @@ public class ControlBusStartRouteTest ex
         template.sendBody("seda:foo", "Hello World");
 
         // start the route using control bus
-        template.sendBody("controlbus:route?routeId=foo&action=start", null);
+        template.sendBody("controlbus:route?routeId=foo&action=start&async=true", null);
 
         assertMockEndpointsSatisfied();
 
-        // now stop the route, using a header
-        template.sendBody("controlbus:route?routeId=foo&action=stop", null);
-
-        assertEquals("Stopped", context.getRouteStatus("foo").name());
-    }
-
-    public void testControlBusStatus() throws Exception {
-        assertEquals("Stopped", context.getRouteStatus("foo").name());
-
-        String status = template.requestBody("controlbus:route?routeId=foo&action=status", null, String.class);
-        assertEquals("Stopped", status);
-
-        context.startRoute("foo");
-
-        status = template.requestBody("controlbus:route?routeId=foo&action=status", null, String.class);
-        assertEquals("Started", status);
+        // get status using async, but we cannot get result as we run task async
+        String status = template.requestBody("controlbus:route?routeId=foo&action=status&async=true", null, String.class);
+        assertNull("Cannot get result if async", status);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java Sat Nov  3 11:05:18 2012
@@ -54,6 +54,18 @@ public class ControlBusStartRouteTest ex
         assertEquals("Started", status);
     }
 
+    public void testControlBusStatusLevelWarn() throws Exception {
+        assertEquals("Stopped", context.getRouteStatus("foo").name());
+
+        String status = template.requestBody("controlbus:route?routeId=foo&action=status&loggingLevel=WARN", null, String.class);
+        assertEquals("Stopped", status);
+
+        context.startRoute("foo");
+
+        status = template.requestBody("controlbus:route?routeId=foo&action=status&loggingLevel=WARN", null, String.class);
+        assertEquals("Started", status);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {