You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/28 14:11:00 UTC
svn commit: r1053342 - in
/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox:
./ direct/ seda/ strategy/
Author: davsclaus
Date: Tue Dec 28 13:10:59 2010
New Revision: 1053342
URL: http://svn.apache.org/viewvc?rev=1053342&view=rev
Log:
CAMEL-3285: Polished code a bit due code review.
Modified:
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java Tue Dec 28 13:10:59 2010
@@ -30,7 +30,7 @@ import org.apache.camel.component.routeb
import org.apache.camel.impl.DefaultComponent;
public class RouteboxComponent extends DefaultComponent {
- RouteboxConfiguration config;
+ final RouteboxConfiguration config;
private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
public RouteboxComponent() {
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java Tue Dec 28 13:10:59 2010
@@ -27,7 +27,6 @@ import org.apache.camel.ProducerTemplate
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultProducerTemplate;
import org.apache.camel.spi.Registry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,7 +56,7 @@ public class RouteboxConfiguration {
public RouteboxConfiguration() {
}
- public RouteboxConfiguration(URI uri) throws Exception {
+ public RouteboxConfiguration(URI uri) {
this();
this.uri = uri;
}
@@ -71,7 +70,9 @@ public class RouteboxConfiguration {
setUri(uri);
setAuthority(uri.getAuthority());
- LOG.info("Authority: " + uri.getAuthority());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Authority: " + uri.getAuthority());
+ }
setEndpointName(getAuthority());
@@ -113,31 +114,28 @@ public class RouteboxConfiguration {
}
if (parameters.containsKey("innerRegistry")) {
- innerRegistry = (Registry) component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class);
+ innerRegistry = component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class);
}
if (isForkContext()) {
if (innerRegistry != null) {
- innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry));
+ innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry));
} else {
- innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext());
+ innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext());
}
-
} else {
innerContext = component.getCamelContext();
}
- //configureInnerContext();
- innerProducerTemplate = new DefaultProducerTemplate(innerContext);
- innerProducerTemplate.start();
+ innerProducerTemplate = innerContext.createProducerTemplate();
setQueueSize(component.getAndRemoveParameter(parameters, "size", Integer.class, 0));
consumerUri = component.resolveAndRemoveReferenceParameter(parameters, "consumerUri", URI.class, new URI("routebox:" + getEndpointName()));
producerUri = component.resolveAndRemoveReferenceParameter(parameters, "producerUri", URI.class, new URI("routebox:" + getEndpointName()));
dispatchStrategy = component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy", RouteboxDispatchStrategy.class, null);
dispatchMap = (HashMap<String, String>) component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap", HashMap.class, new HashMap<String, String>());
- if ((dispatchStrategy == null) && (dispatchMap == null)) {
- LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route");
+ if (dispatchStrategy == null && dispatchMap == null) {
+ LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route.");
}
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java Tue Dec 28 13:10:59 2010
@@ -21,19 +21,24 @@ import java.util.concurrent.ExecutorServ
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public abstract class RouteboxServiceSupport extends ServiceSupport {
- private static final transient Log LOG = LogFactory.getLog(RouteboxServiceSupport.class);
+ private final transient Log log = LogFactory.getLog(getClass());
+ private ExceptionHandler exceptionHandler;
private RouteboxEndpoint endpoint;
private ExecutorService executor;
- private int pendingExchanges;
- private boolean startedInnerContext;
-
+ private volatile boolean startedInnerContext;
+
public RouteboxServiceSupport(RouteboxEndpoint endpoint) {
this.endpoint = endpoint;
+ if (exceptionHandler == null) {
+ exceptionHandler = new LoggingExceptionHandler(getClass());
+ }
}
protected void doStopInnerContext() throws Exception {
@@ -48,8 +53,8 @@ public abstract class RouteboxServiceSup
List<RouteBuilder> routeBuildersList = endpoint.getConfig().getRouteBuilders();
if (!(routeBuildersList.isEmpty())) {
for (RouteBuilder routeBuilder : routeBuildersList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding routebuilder " + routeBuilder + " to " + context.getName());
+ if (log.isDebugEnabled()) {
+ log.debug("Adding RouteBuilder " + routeBuilder + " to " + context.getName());
}
context.addRoutes(routeBuilder);
}
@@ -59,14 +64,6 @@ public abstract class RouteboxServiceSup
setStartedInnerContext(true);
}
- public void setPendingExchanges(int pendingExchanges) {
- this.pendingExchanges = pendingExchanges;
- }
-
- public int getPendingExchanges() {
- return pendingExchanges;
- }
-
public RouteboxEndpoint getRouteboxEndpoint() {
return endpoint;
}
@@ -83,14 +80,19 @@ public abstract class RouteboxServiceSup
this.executor = executor;
}
-
public void setStartedInnerContext(boolean startedInnerContext) {
this.startedInnerContext = startedInnerContext;
}
-
public boolean isStartedInnerContext() {
return startedInnerContext;
}
+ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ public ExceptionHandler getExceptionHandler() {
+ return exceptionHandler;
+ }
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Tue Dec 28 13:10:59 2010
@@ -17,24 +17,20 @@
package org.apache.camel.component.routebox.direct;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.SuspendableService;
import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer, ShutdownAware, SuspendableService {
protected ProducerTemplate producer;
private final Processor processor;
private volatile AsyncProcessor asyncProcessor;
- private ExceptionHandler exceptionHandler;
public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor processor) {
super(endpoint);
@@ -44,33 +40,31 @@ public class RouteboxDirectConsumer exte
protected void doStart() throws Exception {
// add consumer to endpoint
- boolean existing = this == ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer();
- if (!existing && ((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) {
- throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one consumer.");
+ boolean existing = this == getEndpoint().getConsumer();
+ if (!existing && getEndpoint().hasConsumer(this)) {
+ throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getEndpoint() + " only allows one consumer.");
}
if (!existing) {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this);
+ getEndpoint().addConsumer(this);
}
// now start the inner context
if (!isStartedInnerContext()) {
doStartInnerContext();
}
-
}
protected void doStop() throws Exception {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+ getEndpoint().removeConsumer(this);
// now stop the inner context
if (isStartedInnerContext()) {
doStopInnerContext();
}
-
}
protected void doSuspend() throws Exception {
- ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+ getEndpoint().removeConsumer(this);
}
protected void doResume() throws Exception {
@@ -78,11 +72,6 @@ public class RouteboxDirectConsumer exte
doStart();
}
- public Exchange processRequest(Exchange exchange) {
- return exchange;
-
- }
-
/**
* Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
* processor on the consumer. If the processor does not implement the interface,
@@ -95,54 +84,24 @@ public class RouteboxDirectConsumer exte
return asyncProcessor;
}
- public ExceptionHandler getExceptionHandler() {
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
- return exceptionHandler;
- }
-
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- /**
- * Handles the given exception using the {@link #getExceptionHandler()}
- *
- * @param t the exception to handle
- */
- protected void handleException(Throwable t) {
- Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
- getExceptionHandler().handleException(newt);
- }
-
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
- */
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// deny stopping on shutdown as we want direct consumers to run in case some other queues
// depend on this consumer to run, so it can complete its exchanges
return true;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
- */
public int getPendingExchangesSize() {
// return 0 as we do not have an internal memory queue with a variable size
// of inflight messages.
return 0;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
- */
public void prepareShutdown() {
-
+ // noop
}
- public Endpoint getEndpoint() {
- return (Endpoint) getRouteboxEndpoint();
+ public RouteboxDirectEndpoint getEndpoint() {
+ return (RouteboxDirectEndpoint) getRouteboxEndpoint();
}
public Processor getProcessor() {
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java Tue Dec 28 13:10:59 2010
@@ -27,7 +27,7 @@ import org.apache.camel.component.routeb
import org.apache.camel.component.routebox.RouteboxEndpoint;
public class RouteboxDirectEndpoint extends RouteboxEndpoint {
- private volatile Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>();
+ private final Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>();
public RouteboxDirectEndpoint(String endpointUri) {
super(endpointUri);
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java Tue Dec 28 13:10:59 2010
@@ -26,9 +26,7 @@ import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.routebox.RouteboxServiceSupport;
import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,21 +34,20 @@ import org.apache.commons.logging.LogFac
public class RouteboxDirectProducer extends RouteboxServiceSupport implements Producer, AsyncProcessor {
private static final transient Log LOG = LogFactory.getLog(RouteboxDirectProducer.class);
protected ProducerTemplate producer;
- private ExceptionHandler exceptionHandler;
-
+
public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) {
super(endpoint);
producer = endpoint.getConfig().getInnerProducerTemplate();
}
public void process(Exchange exchange) throws Exception {
- Exchange result = null;
+ Exchange result;
if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) {
throw new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to Inner Route " + exchange);
}
RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange);
@@ -64,14 +61,14 @@ public class RouteboxDirectProducer exte
boolean flag = true;
if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null)
- && (((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
+ && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange));
callback.done(true);
flag = true;
} else {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to Inner Route " + exchange);
}
RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
@@ -97,70 +94,46 @@ public class RouteboxDirectProducer exte
}
protected void doStart() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting producer: " + this);
- }
-
- if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
+ if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
// start an inner context
if (!isStartedInnerContext()) {
doStartInnerContext();
}
}
-
}
protected void doStop() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping producer: " + this);
- }
-
- if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
+ if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
// stop the inner context
if (isStartedInnerContext()) {
doStopInnerContext();
}
}
-
}
- @Override
- public String toString() {
- return "Producer[" + getRouteboxEndpoint()
- .getEndpointUri() + "]";
- }
-
public Endpoint getEndpoint() {
return getRouteboxEndpoint();
}
public Exchange createExchange() {
- return getRouteboxEndpoint()
- .createExchange();
+ return getRouteboxEndpoint().createExchange();
}
public Exchange createExchange(ExchangePattern pattern) {
- return getRouteboxEndpoint()
- .createExchange(pattern);
+ return getRouteboxEndpoint().createExchange(pattern);
}
public Exchange createExchange(Exchange exchange) {
- return getRouteboxEndpoint()
- .createExchange(exchange);
+ return getRouteboxEndpoint().createExchange(exchange);
}
public boolean isSingleton() {
return true;
}
- public ExceptionHandler getExceptionHandler() {
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
- return exceptionHandler;
+ @Override
+ public String toString() {
+ return "Producer[" + getRouteboxEndpoint().getEndpointUri() + "]";
}
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Tue Dec 28 13:10:59 2010
@@ -41,23 +41,13 @@ public class RouteboxSedaConsumer extend
private static final transient Log LOG = LogFactory.getLog(RouteboxSedaConsumer.class);
protected AsyncProcessor processor;
protected ProducerTemplate producer;
- private int pendingExchanges;
- private ExceptionHandler exceptionHandler;
-
+
public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) {
super(endpoint);
this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
- producer = endpoint.getConfig().getInnerProducerTemplate();
- producer.setMaximumCacheSize(endpoint.getConfig().getThreads());
- if (exceptionHandler == null) {
- exceptionHandler = new LoggingExceptionHandler(getClass());
- }
+ this.producer = endpoint.getConfig().getInnerProducerTemplate();
}
-
- /* (non-Javadoc)
- * @see org.apache.camel.impl.ServiceSupport#doStart()
- */
@Override
protected void doStart() throws Exception {
((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this);
@@ -65,30 +55,24 @@ public class RouteboxSedaConsumer extend
// Create a URI link from the primary context to routes in the new inner context
int poolSize = getRouteboxEndpoint().getConfig().getThreads();
- setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy()
- .newFixedThreadPool(this, ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize));
+ setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy()
+ .newFixedThreadPool(this, getRouteboxEndpoint().getEndpointUri(), poolSize));
for (int i = 0; i < poolSize; i++) {
- getExecutor().execute((Runnable) this);
+ getExecutor().execute(this);
}
}
- /* (non-Javadoc)
- * @see org.apache.camel.impl.ServiceSupport#doStop()
- */
@Override
protected void doStop() throws Exception {
((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this);
// Shutdown the executor
- ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
+ getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
setExecutor(null);
doStopInnerContext();
}
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
+ public void run() {
BlockingQueue<Exchange> queue = ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue();
while (queue != null && isRunAllowed()) {
try {
@@ -104,13 +88,13 @@ public class RouteboxSedaConsumer extend
}
private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final Exchange exchange) throws InterruptedException {
- Exchange result = null;
+ Exchange result;
if (exchange != null) {
if (isRunAllowed()) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("**** Dispatching to Inner Route ****");
+ LOG.debug("Dispatching to inner route: " + exchange);
}
RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange);
@@ -131,33 +115,20 @@ public class RouteboxSedaConsumer extend
}
}
-
- /* (non-Javadoc)
- * @see org.apache.camel.Consumer#getEndpoint()
- */
public Endpoint getEndpoint() {
- return (Endpoint) getRouteboxEndpoint();
+ return getRouteboxEndpoint();
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
- */
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
return false;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
- */
public int getPendingExchangesSize() {
- return getPendingExchanges();
+ // TODO: Get size of queue
+ return 0;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
- */
public void prepareShutdown() {
-
}
public void setProcessor(AsyncProcessor processor) {
@@ -168,20 +139,4 @@ public class RouteboxSedaConsumer extend
return processor;
}
- public void setPendingExchanges(int pendingExchanges) {
- this.pendingExchanges = pendingExchanges;
- }
-
- public int getPendingExchanges() {
- return pendingExchanges;
- }
-
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- public ExceptionHandler getExceptionHandler() {
- return exceptionHandler;
- }
-
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java Tue Dec 28 13:10:59 2010
@@ -111,16 +111,10 @@ public class RouteboxSedaEndpoint extend
return queue;
}
- /* (non-Javadoc)
- * @see org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported()
- */
public boolean isMultipleConsumersSupported() {
return true;
}
- /* (non-Javadoc)
- * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges()
- */
public List<Exchange> getExchanges() {
return new ArrayList<Exchange>(getQueue());
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java Tue Dec 28 13:10:59 2010
@@ -25,13 +25,15 @@ import org.apache.camel.Exchange;
* A strategy for identifying the route consumer in the routebox where the exchange should to be dispatched
*/
public interface RouteboxDispatchStrategy {
+
/**
* Receives an incoming exchange and consumer list and identifies the inner route consumer for dispatching the exchange
*
- * @param innerRouteConsumers the list of possible real-time inner route consumers available
+ * @param destinations the list of possible real-time inner route consumers available
* to where the exchange can be dispatched in the routebox
* @param exchange the incoming exchange
* @return a selected consumer to whom the exchange can be directed
+ * @throws Exception is thrown if error
*/
URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws Exception;
}
Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java Tue Dec 28 13:10:59 2010
@@ -25,7 +25,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -40,7 +40,6 @@ import org.apache.camel.model.RouteDefin
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
public class RouteboxDispatcher {
private static final transient Log LOG = LogFactory.getLog(RouteboxDispatcher.class);
private ProducerTemplate producer;
@@ -51,11 +50,11 @@ public class RouteboxDispatcher {
}
public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
- URI dispatchUri = null;
- Exchange reply = null;
+ URI dispatchUri;
+ Exchange reply;
if (LOG.isDebugEnabled()) {
- LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+ LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -63,18 +62,18 @@ public class RouteboxDispatcher {
if (exchange.getPattern() == ExchangePattern.InOnly) {
reply = producer.send(dispatchUri.toASCIIString(), exchange);
} else {
- reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());
+ reply = issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());
}
return reply;
}
public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
- URI dispatchUri = null;
- Exchange reply = null;
+ URI dispatchUri;
+ Exchange reply;
if (LOG.isDebugEnabled()) {
- LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+ LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -91,26 +90,27 @@ public class RouteboxDispatcher {
}
protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
- URI dispatchUri = null;
+ URI dispatchUri;
List<URI> consumerUris = getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
if (consumerUris.isEmpty()) {
- throw new CamelException("No routes found for dispatch in Routebox");
+ throw new CamelExchangeException("No routes found to dispatch in Routebox at " + endpoint, exchange);
} else if (consumerUris.size() == 1) {
dispatchUri = consumerUris.get(0);
} else {
if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
- //apply URI string found in dispatch Map
- if (endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"))) {
- dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")));
+ // apply URI string found in dispatch Map
+ String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY", String.class);
+ if (endpoint.getConfig().getDispatchMap().containsKey(key)) {
+ dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(key));
} else {
- throw new CamelException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"));
+ throw new CamelExchangeException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange);
}
} else {
- //apply dispatch strategy
+ // apply dispatch strategy
dispatchUri = endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, exchange);
if (dispatchUri == null) {
- throw new CamelException("No matching inner routes found for Operation");
+ throw new CamelExchangeException("No matching inner routes found for Operation", exchange);
}
}
}
@@ -138,9 +138,7 @@ public class RouteboxDispatcher {
Exchange exchange = producer.send(endpoint, pattern, new Processor() {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
- for (Map.Entry<String, Object> header : headers.entrySet()) {
- in.setHeader(header.getKey(), header.getValue());
- }
+ in.getHeaders().putAll(headers);
in.setBody(body);
}
});