You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/03 12:05:19 UTC
svn commit: r1405320 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/controlbus/
test/java/org/apache/camel/component/controlbus/
Author: davsclaus
Date: Sat Nov 3 11:05:18 2012
New Revision: 1405320
URL: http://svn.apache.org/viewvc?rev=1405320&view=rev
Log:
CAMEL-5651: More stuff to control bus component.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java
- copied, changed from r1405304, camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusComponent.java Sat Nov 3 11:05:18 2012
@@ -17,6 +17,7 @@
package org.apache.camel.component.controlbus;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
@@ -26,10 +27,12 @@ import org.apache.camel.impl.DefaultComp
*/
public class ControlBusComponent extends DefaultComponent {
- // TODO: allow to use a thread pool for tasks so you dont have to wait
// TODO: management command, to use the JMX mbeans easier
// TODO: Bulk status in POJO / JSON format
// TODO: a header with the action to do instead of uri, as we may want to be lenient
+ // TODO: JMX stats and operations of in-flight tasks, and history of done etc
+
+ private ExecutorService executorService;
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -48,4 +51,20 @@ public class ControlBusComponent extends
setProperties(answer, parameters);
return answer;
}
+
+ synchronized ExecutorService getExecutorService() {
+ if (executorService == null) {
+ executorService = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "ControlBus");
+ }
+ return executorService;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ executorService = null;
+ }
+ super.doStop();
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusEndpoint.java Sat Nov 3 11:05:18 2012
@@ -18,11 +18,13 @@ package org.apache.camel.component.contr
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Language;
+import org.apache.camel.util.CamelLogger;
/**
* The control bus endpoint.
@@ -32,6 +34,8 @@ public class ControlBusEndpoint extends
private Language language;
private String routeId;
private String action;
+ private boolean async;
+ private LoggingLevel loggingLevel = LoggingLevel.INFO;
public ControlBusEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -39,7 +43,8 @@ public class ControlBusEndpoint extends
@Override
public Producer createProducer() throws Exception {
- return new ControlBusProducer(this);
+ CamelLogger logger = new CamelLogger(ControlBusProducer.class.getName(), loggingLevel);
+ return new ControlBusProducer(this, logger);
}
@Override
@@ -53,6 +58,11 @@ public class ControlBusEndpoint extends
return false;
}
+ @Override
+ public ControlBusComponent getComponent() {
+ return (ControlBusComponent) super.getComponent();
+ }
+
public Language getLanguage() {
return language;
}
@@ -76,4 +86,20 @@ public class ControlBusEndpoint extends
public void setAction(String action) {
this.action = action;
}
+
+ public boolean isAsync() {
+ return async;
+ }
+
+ public void setAsync(boolean async) {
+ this.async = async;
+ }
+
+ public LoggingLevel getLoggingLevel() {
+ return loggingLevel;
+ }
+
+ public void setLoggingLevel(LoggingLevel loggingLevel) {
+ this.loggingLevel = loggingLevel;
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/controlbus/ControlBusProducer.java Sat Nov 3 11:05:18 2012
@@ -23,6 +23,7 @@ import org.apache.camel.Expression;
import org.apache.camel.ServiceStatus;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.Language;
+import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ExchangeHelper;
/**
@@ -30,8 +31,11 @@ import org.apache.camel.util.ExchangeHel
*/
public class ControlBusProducer extends DefaultAsyncProducer {
- public ControlBusProducer(Endpoint endpoint) {
+ private final CamelLogger logger;
+
+ public ControlBusProducer(Endpoint endpoint, CamelLogger logger) {
super(endpoint);
+ this.logger = logger;
}
@Override
@@ -49,7 +53,7 @@ public class ControlBusProducer extends
}
} else if (getEndpoint().getAction() != null) {
try {
- processByAction(exchange, getEndpoint().getRouteId(), getEndpoint().getAction());
+ processByAction(exchange);
} catch (Exception e) {
exchange.setException(e);
}
@@ -60,28 +64,104 @@ public class ControlBusProducer extends
}
protected void processByLanguage(Exchange exchange, Language language) throws Exception {
- // create dummy exchange
- Exchange dummy = ExchangeHelper.createCopy(exchange, true);
+ LanguageTask task = new LanguageTask(exchange, language);
+ if (getEndpoint().isAsync()) {
+ getEndpoint().getComponent().getExecutorService().submit(task);
+ } else {
+ task.run();
+ }
+ }
+
+ protected void processByAction(Exchange exchange) throws Exception {
+ ActionTask task = new ActionTask(exchange);
+ if (getEndpoint().isAsync()) {
+ getEndpoint().getComponent().getExecutorService().submit(task);
+ } else {
+ task.run();
+ }
+ }
+
+ /**
+ * Tasks to run when processing by language.
+ */
+ private final class LanguageTask implements Runnable {
+
+ private final Exchange exchange;
+ private final Language language;
+
+ private LanguageTask(Exchange exchange, Language language) {
+ this.exchange = exchange;
+ this.language = language;
+ }
+
+ @Override
+ public void run() {
+ String task = null;
+ Object result = null;
+
+ try {
+ // create dummy exchange
+ Exchange dummy = ExchangeHelper.createCopy(exchange, true);
- String body = dummy.getIn().getMandatoryBody(String.class);
- if (body != null) {
- Expression exp = language.createExpression(body);
- Object out = exp.evaluate(dummy, Object.class);
- if (out != null) {
- exchange.getIn().setBody(out);
+ task = dummy.getIn().getMandatoryBody(String.class);
+ if (task != null) {
+ Expression exp = language.createExpression(task);
+ result = exp.evaluate(dummy, Object.class);
+ }
+
+ if (result != null && !getEndpoint().isAsync()) {
+ // can only set result on exchange if sync
+ exchange.getIn().setBody(result);
+ }
+
+ if (task != null) {
+ logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
+ }
+ } catch (Exception e) {
+ logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
}
}
}
- protected void processByAction(Exchange exchange, String id, String action) throws Exception {
- if ("start".equals(action)) {
- getEndpoint().getCamelContext().startRoute(id);
- } else if ("stop".equals(action)) {
- getEndpoint().getCamelContext().stopRoute(id);
- } else if ("status".equals(action)) {
- ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id);
- if (status != null) {
- exchange.getIn().setBody(status.name());
+ /**
+ * Tasks to run when processing by route action.
+ */
+ private final class ActionTask implements Runnable {
+
+ private final Exchange exchange;
+
+ private ActionTask(Exchange exchange) {
+ this.exchange = exchange;
+ }
+
+ @Override
+ public void run() {
+ String action = getEndpoint().getAction();
+ String id = getEndpoint().getRouteId();
+
+ Object result = null;
+ String task = action + " route " + id;
+
+ try {
+ if ("start".equals(action)) {
+ getEndpoint().getCamelContext().startRoute(id);
+ } else if ("stop".equals(action)) {
+ getEndpoint().getCamelContext().stopRoute(id);
+ } else if ("status".equals(action)) {
+ ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id);
+ if (status != null) {
+ result = status.name();
+ }
+ }
+
+ if (result != null && !getEndpoint().isAsync()) {
+ // can only set result on exchange if sync
+ exchange.getIn().setBody(result);
+ }
+
+ logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
+ } catch (Exception e) {
+ logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
}
}
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java (from r1405304, camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java&r1=1405304&r2=1405320&rev=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteAsyncTest.java Sat Nov 3 11:05:18 2012
@@ -22,9 +22,9 @@ import org.apache.camel.builder.RouteBui
/**
*
*/
-public class ControlBusStartRouteTest extends ContextTestSupport {
+public class ControlBusStartRouteAsyncTest extends ContextTestSupport {
- public void testControlBusStartStop() throws Exception {
+ public void testControlBusAsync() throws Exception {
assertEquals("Stopped", context.getRouteStatus("foo").name());
// store a pending message
@@ -32,26 +32,13 @@ public class ControlBusStartRouteTest ex
template.sendBody("seda:foo", "Hello World");
// start the route using control bus
- template.sendBody("controlbus:route?routeId=foo&action=start", null);
+ template.sendBody("controlbus:route?routeId=foo&action=start&async=true", null);
assertMockEndpointsSatisfied();
- // now stop the route, using a header
- template.sendBody("controlbus:route?routeId=foo&action=stop", null);
-
- assertEquals("Stopped", context.getRouteStatus("foo").name());
- }
-
- public void testControlBusStatus() throws Exception {
- assertEquals("Stopped", context.getRouteStatus("foo").name());
-
- String status = template.requestBody("controlbus:route?routeId=foo&action=status", null, String.class);
- assertEquals("Stopped", status);
-
- context.startRoute("foo");
-
- status = template.requestBody("controlbus:route?routeId=foo&action=status", null, String.class);
- assertEquals("Started", status);
+ // get status using async, but we cannot get result as we run task async
+ String status = template.requestBody("controlbus:route?routeId=foo&action=status&async=true", null, String.class);
+ assertNull("Cannot get result if async", status);
}
@Override
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java?rev=1405320&r1=1405319&r2=1405320&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/controlbus/ControlBusStartRouteTest.java Sat Nov 3 11:05:18 2012
@@ -54,6 +54,18 @@ public class ControlBusStartRouteTest ex
assertEquals("Started", status);
}
+ public void testControlBusStatusLevelWarn() throws Exception {
+ assertEquals("Stopped", context.getRouteStatus("foo").name());
+
+ String status = template.requestBody("controlbus:route?routeId=foo&action=status&loggingLevel=WARN", null, String.class);
+ assertEquals("Stopped", status);
+
+ context.startRoute("foo");
+
+ status = template.requestBody("controlbus:route?routeId=foo&action=status&loggingLevel=WARN", null, String.class);
+ assertEquals("Started", status);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {