You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/28 14:11:00 UTC

svn commit: r1053342 - in /camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox: ./ direct/ seda/ strategy/

Author: davsclaus
Date: Tue Dec 28 13:10:59 2010
New Revision: 1053342

URL: http://svn.apache.org/viewvc?rev=1053342&view=rev
Log:
CAMEL-3285: Polished code a bit due code review.

Modified:
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
    camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxComponent.java Tue Dec 28 13:10:59 2010
@@ -30,7 +30,7 @@ import org.apache.camel.component.routeb
 import org.apache.camel.impl.DefaultComponent;
 
 public class RouteboxComponent extends DefaultComponent {
-    RouteboxConfiguration config;
+    final RouteboxConfiguration config;
     private final Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
     
     public RouteboxComponent() {

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxConfiguration.java Tue Dec 28 13:10:59 2010
@@ -27,7 +27,6 @@ import org.apache.camel.ProducerTemplate
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultProducerTemplate;
 import org.apache.camel.spi.Registry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,7 +56,7 @@ public class RouteboxConfiguration {
     public RouteboxConfiguration() {
     }
 
-    public RouteboxConfiguration(URI uri) throws Exception {
+    public RouteboxConfiguration(URI uri) {
         this();
         this.uri = uri;
     }
@@ -71,7 +70,9 @@ public class RouteboxConfiguration {
         
         setUri(uri);
         setAuthority(uri.getAuthority());
-        LOG.info("Authority: " + uri.getAuthority());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Authority: " + uri.getAuthority());
+        }
         
         setEndpointName(getAuthority());
         
@@ -113,31 +114,28 @@ public class RouteboxConfiguration {
         }
         
         if (parameters.containsKey("innerRegistry")) {
-            innerRegistry = (Registry) component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class);
+            innerRegistry = component.resolveAndRemoveReferenceParameter(parameters, "innerRegistry", Registry.class);
         }
         
         if (isForkContext()) {
             if (innerRegistry != null) {
-                innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry));
+                innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext(innerRegistry));
             } else {
-                innerContext = (CamelContext) component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext());
+                innerContext = component.resolveAndRemoveReferenceParameter(parameters, "innerContext", CamelContext.class, new DefaultCamelContext());
             }
-
         } else {
             innerContext = component.getCamelContext();
         }
         
-        //configureInnerContext();
-        innerProducerTemplate = new DefaultProducerTemplate(innerContext);
-        innerProducerTemplate.start();
+        innerProducerTemplate = innerContext.createProducerTemplate();
         setQueueSize(component.getAndRemoveParameter(parameters, "size", Integer.class, 0));
         consumerUri = component.resolveAndRemoveReferenceParameter(parameters, "consumerUri", URI.class, new URI("routebox:" + getEndpointName()));
         producerUri = component.resolveAndRemoveReferenceParameter(parameters, "producerUri", URI.class, new URI("routebox:" + getEndpointName()));        
         
         dispatchStrategy = component.resolveAndRemoveReferenceParameter(parameters, "dispatchStrategy", RouteboxDispatchStrategy.class, null);
         dispatchMap = (HashMap<String, String>) component.resolveAndRemoveReferenceParameter(parameters, "dispatchMap", HashMap.class, new HashMap<String, String>());
-        if ((dispatchStrategy == null) && (dispatchMap == null)) { 
-            LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route");
+        if (dispatchStrategy == null && dispatchMap == null) {
+            LOG.warn("No Routebox Dispatch Map or Strategy has been set. Routebox may not have more than one inner route.");
         }        
     }
 

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/RouteboxServiceSupport.java Tue Dec 28 13:10:59 2010
@@ -21,19 +21,24 @@ import java.util.concurrent.ExecutorServ
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public abstract class RouteboxServiceSupport extends ServiceSupport {
-    private static final transient Log LOG = LogFactory.getLog(RouteboxServiceSupport.class);
+    private final transient Log log = LogFactory.getLog(getClass());
+    private ExceptionHandler exceptionHandler;
     private RouteboxEndpoint endpoint;
     private ExecutorService executor;
-    private int pendingExchanges;
-    private boolean startedInnerContext;
-    
+    private volatile boolean startedInnerContext;
+
     public RouteboxServiceSupport(RouteboxEndpoint endpoint) {
         this.endpoint = endpoint;
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
     }
     
     protected void doStopInnerContext() throws Exception {
@@ -48,8 +53,8 @@ public abstract class RouteboxServiceSup
         List<RouteBuilder> routeBuildersList = endpoint.getConfig().getRouteBuilders();
         if (!(routeBuildersList.isEmpty())) {
             for (RouteBuilder routeBuilder : routeBuildersList) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding routebuilder " + routeBuilder + " to " + context.getName());
+                if (log.isDebugEnabled()) {
+                    log.debug("Adding RouteBuilder " + routeBuilder + " to " + context.getName());
                 }
                 context.addRoutes(routeBuilder);
             }
@@ -59,14 +64,6 @@ public abstract class RouteboxServiceSup
         setStartedInnerContext(true);
     }
 
-    public void setPendingExchanges(int pendingExchanges) {
-        this.pendingExchanges = pendingExchanges;
-    }
-
-    public int getPendingExchanges() {
-        return pendingExchanges;
-    }
-
     public RouteboxEndpoint getRouteboxEndpoint() {
         return endpoint;
     }
@@ -83,14 +80,19 @@ public abstract class RouteboxServiceSup
         this.executor = executor;
     }
 
-
     public void setStartedInnerContext(boolean startedInnerContext) {
         this.startedInnerContext = startedInnerContext;
     }
 
-
     public boolean isStartedInnerContext() {
         return startedInnerContext;
     }
 
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
+    }
 }

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Tue Dec 28 13:10:59 2010
@@ -17,24 +17,20 @@
 package org.apache.camel.component.routebox.direct;
 
 import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.component.routebox.RouteboxConsumer;
+import org.apache.camel.component.routebox.RouteboxEndpoint;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
-import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
 
 public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer, ShutdownAware, SuspendableService {
     protected ProducerTemplate producer;
     private final Processor processor;
     private volatile AsyncProcessor asyncProcessor;
-    private ExceptionHandler exceptionHandler;
 
     public RouteboxDirectConsumer(RouteboxDirectEndpoint endpoint, Processor processor) {
         super(endpoint);
@@ -44,33 +40,31 @@ public class RouteboxDirectConsumer exte
     
     protected void doStart() throws Exception {
         // add consumer to endpoint
-        boolean existing = this == ((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer();
-        if (!existing && ((RouteboxDirectEndpoint)getRouteboxEndpoint()).hasConsumer(this)) {
-            throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getRouteboxEndpoint() + " only allows one consumer.");
+        boolean existing = this == getEndpoint().getConsumer();
+        if (!existing && getEndpoint().hasConsumer(this)) {
+            throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint. Endpoint " + getEndpoint() + " only allows one consumer.");
         }
         if (!existing) {
-            ((RouteboxDirectEndpoint)getRouteboxEndpoint()).addConsumer(this);
+            getEndpoint().addConsumer(this);
         }
         
         // now start the inner context
         if (!isStartedInnerContext()) {
             doStartInnerContext(); 
         }
-        
     }
 
     protected void doStop() throws Exception {
-        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+        getEndpoint().removeConsumer(this);
         
         // now stop the inner context
         if (isStartedInnerContext()) {
             doStopInnerContext();
         }
-
     }
 
     protected void doSuspend() throws Exception {
-        ((RouteboxDirectEndpoint)getRouteboxEndpoint()).removeConsumer(this);
+        getEndpoint().removeConsumer(this);
     }
 
     protected void doResume() throws Exception {
@@ -78,11 +72,6 @@ public class RouteboxDirectConsumer exte
         doStart();
     }
     
-    public Exchange processRequest(Exchange exchange) {
-        return exchange;
-        
-    }
-    
     /**
      * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
      * processor on the consumer. If the processor does not implement the interface,
@@ -95,54 +84,24 @@ public class RouteboxDirectConsumer exte
         return asyncProcessor;
     }
 
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    /**
-     * Handles the given exception using the {@link #getExceptionHandler()}
-     * 
-     * @param t the exception to handle
-     */
-    protected void handleException(Throwable t) {
-        Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
-        getExceptionHandler().handleException(newt);
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
-     */
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
         // deny stopping on shutdown as we want direct consumers to run in case some other queues
         // depend on this consumer to run, so it can complete its exchanges
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
-     */
     public int getPendingExchangesSize() {
         // return 0 as we do not have an internal memory queue with a variable size
         // of inflight messages. 
         return 0;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
-     */
     public void prepareShutdown() {
-        
+        // noop
     }
     
-    public Endpoint getEndpoint() {
-        return (Endpoint) getRouteboxEndpoint();
+    public RouteboxDirectEndpoint getEndpoint() {
+        return (RouteboxDirectEndpoint) getRouteboxEndpoint();
     }
 
     public Processor getProcessor() {

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectEndpoint.java Tue Dec 28 13:10:59 2010
@@ -27,7 +27,7 @@ import org.apache.camel.component.routeb
 import org.apache.camel.component.routebox.RouteboxEndpoint;
 
 public class RouteboxDirectEndpoint extends RouteboxEndpoint {
-    private volatile Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>();
+    private final Map<String, RouteboxDirectConsumer> consumers = new HashMap<String, RouteboxDirectConsumer>();
 
     public RouteboxDirectEndpoint(String endpointUri) {
         super(endpointUri);

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java Tue Dec 28 13:10:59 2010
@@ -26,9 +26,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.routebox.RouteboxServiceSupport;
 import org.apache.camel.component.routebox.strategy.RouteboxDispatcher;
-import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,21 +34,20 @@ import org.apache.commons.logging.LogFac
 public class RouteboxDirectProducer extends RouteboxServiceSupport implements Producer, AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(RouteboxDirectProducer.class);
     protected ProducerTemplate producer;
-    private ExceptionHandler exceptionHandler;
-    
+
     public RouteboxDirectProducer(RouteboxDirectEndpoint endpoint) {
         super(endpoint);
         producer = endpoint.getConfig().getInnerProducerTemplate();
     }
 
     public void process(Exchange exchange) throws Exception {
-        Exchange result = null;
+        Exchange result;
         
         if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) && (getRouteboxEndpoint().getConfig().isSendToConsumer())) {
             throw new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange);
         } else {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("**** Dispatching to Inner Route ****");
+                LOG.debug("Dispatching to Inner Route " + exchange);
             }
             RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
             result = dispatcher.dispatchSync(getRouteboxEndpoint(), exchange);
@@ -64,14 +61,14 @@ public class RouteboxDirectProducer exte
         boolean flag = true;
         
         if ((((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer() == null) 
-            && (((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
+            && ((getRouteboxEndpoint()).getConfig().isSendToConsumer())) {
             exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + getRouteboxEndpoint(), exchange));
             callback.done(true);
             flag = true;
         } else {
             try {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("**** Dispatching to Inner Route ****");
+                    LOG.debug("Dispatching to Inner Route " + exchange);
                 }
                 
                 RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
@@ -97,70 +94,46 @@ public class RouteboxDirectProducer exte
     }
     
     protected void doStart() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting producer: " + this);
-        }
-        
-        if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
+        if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
             // start an inner context
             if (!isStartedInnerContext()) {
                 doStartInnerContext(); 
             }
         }
-        
     }
 
     protected void doStop() throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping producer: " + this);
-        }
-        
-        if (!((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConfig().isSendToConsumer()) {        
+        if (!(getRouteboxEndpoint()).getConfig().isSendToConsumer()) {
             // stop the inner context
             if (isStartedInnerContext()) {
                 doStopInnerContext();
             }
         }
-
     }
     
-    @Override
-    public String toString() {
-        return "Producer[" + getRouteboxEndpoint()
-                .getEndpointUri() + "]";
-    }
-
     public Endpoint getEndpoint() {
         return getRouteboxEndpoint();
     }
 
     public Exchange createExchange() {
-        return getRouteboxEndpoint()
-                .createExchange();
+        return getRouteboxEndpoint().createExchange();
     }
 
     public Exchange createExchange(ExchangePattern pattern) {
-        return getRouteboxEndpoint()
-                .createExchange(pattern);
+        return getRouteboxEndpoint().createExchange(pattern);
     }
 
     public Exchange createExchange(Exchange exchange) {
-        return getRouteboxEndpoint()
-                .createExchange(exchange);
+        return getRouteboxEndpoint().createExchange(exchange);
     }
 
     public boolean isSingleton() {
         return true;
     }
 
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
+    @Override
+    public String toString() {
+        return "Producer[" + getRouteboxEndpoint().getEndpointUri() + "]";
     }
 
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
 }

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Tue Dec 28 13:10:59 2010
@@ -41,23 +41,13 @@ public class RouteboxSedaConsumer extend
     private static final transient Log LOG = LogFactory.getLog(RouteboxSedaConsumer.class);
     protected AsyncProcessor processor;
     protected ProducerTemplate producer;
-    private int pendingExchanges;
-    private ExceptionHandler exceptionHandler;
-    
+
     public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) {
         super(endpoint);
         this.setProcessor(AsyncProcessorTypeConverter.convert(processor));
-        producer = endpoint.getConfig().getInnerProducerTemplate();
-        producer.setMaximumCacheSize(endpoint.getConfig().getThreads());
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
+        this.producer = endpoint.getConfig().getInnerProducerTemplate();
     }
 
-
-    /* (non-Javadoc)
-     * @see org.apache.camel.impl.ServiceSupport#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStarted(this);
@@ -65,30 +55,24 @@ public class RouteboxSedaConsumer extend
         
         // Create a URI link from the primary context to routes in the new inner context
         int poolSize = getRouteboxEndpoint().getConfig().getThreads();
-        setExecutor(((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy()
-                        .newFixedThreadPool(this, ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getEndpointUri(), poolSize));
+        setExecutor(getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy()
+                .newFixedThreadPool(this, getRouteboxEndpoint().getEndpointUri(), poolSize));
         for (int i = 0; i < poolSize; i++) {
-            getExecutor().execute((Runnable) this);
+            getExecutor().execute(this);
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.impl.ServiceSupport#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         ((RouteboxSedaEndpoint)getRouteboxEndpoint()).onStopped(this);
         // Shutdown the executor
-        ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
+        getRouteboxEndpoint().getCamelContext().getExecutorServiceStrategy().shutdown(getExecutor());
         setExecutor(null);
         
         doStopInnerContext(); 
     }
     
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
-     */
-    public void run() {       
+    public void run() {
         BlockingQueue<Exchange> queue = ((RouteboxSedaEndpoint)getRouteboxEndpoint()).getQueue();
         while (queue != null && isRunAllowed()) {
             try {
@@ -104,13 +88,13 @@ public class RouteboxSedaConsumer extend
     }
     
     private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final Exchange exchange) throws InterruptedException {
-        Exchange result = null;
+        Exchange result;
 
         if (exchange != null) {
             if (isRunAllowed()) {
                 try {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("**** Dispatching to Inner Route ****");
+                        LOG.debug("Dispatching to inner route: " + exchange);
                     }
                     RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer);
                     result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); 
@@ -131,33 +115,20 @@ public class RouteboxSedaConsumer extend
         }
     }
     
-    
-    /* (non-Javadoc)
-     * @see org.apache.camel.Consumer#getEndpoint()
-     */
     public Endpoint getEndpoint() {
-        return (Endpoint) getRouteboxEndpoint();
+        return getRouteboxEndpoint();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#deferShutdown(org.apache.camel.ShutdownRunningTask)
-     */
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
         return false;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#getPendingExchangesSize()
-     */
     public int getPendingExchangesSize() {
-        return getPendingExchanges();
+        // TODO: Get size of queue
+        return 0;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.ShutdownAware#prepareShutdown()
-     */
     public void prepareShutdown() {
-        
     }
     
     public void setProcessor(AsyncProcessor processor) {
@@ -168,20 +139,4 @@ public class RouteboxSedaConsumer extend
         return processor;
     }
 
-    public void setPendingExchanges(int pendingExchanges) {
-        this.pendingExchanges = pendingExchanges;
-    }
-
-    public int getPendingExchanges() {
-        return pendingExchanges;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    public ExceptionHandler getExceptionHandler() {
-        return exceptionHandler;
-    }
-    
 }

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java Tue Dec 28 13:10:59 2010
@@ -111,16 +111,10 @@ public class RouteboxSedaEndpoint extend
         return queue;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.MultipleConsumersSupport#isMultipleConsumersSupported()
-     */
     public boolean isMultipleConsumersSupported() {
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.camel.spi.BrowsableEndpoint#getExchanges()
-     */
     public List<Exchange> getExchanges() {
         return new ArrayList<Exchange>(getQueue());
     }

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatchStrategy.java Tue Dec 28 13:10:59 2010
@@ -25,13 +25,15 @@ import org.apache.camel.Exchange;
  * A strategy for identifying the route consumer in the routebox where the exchange should to be dispatched
  */
 public interface RouteboxDispatchStrategy {
+
     /**
      * Receives an incoming exchange and consumer list and identifies the inner route consumer for dispatching the exchange
      *
-     * @param innerRouteConsumers the list of possible real-time inner route consumers available 
+     * @param destinations the list of possible real-time inner route consumers available
      *        to where the exchange can be dispatched in the routebox
      * @param exchange the incoming exchange
      * @return a selected consumer to whom the exchange can be directed
+     * @throws Exception is thrown if error
      */
     URI selectDestinationUri(List<URI> destinations, Exchange exchange) throws Exception;
 } 

Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java?rev=1053342&r1=1053341&r2=1053342&view=diff
==============================================================================
--- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java (original)
+++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/strategy/RouteboxDispatcher.java Tue Dec 28 13:10:59 2010
@@ -25,7 +25,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -40,7 +40,6 @@ import org.apache.camel.model.RouteDefin
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class RouteboxDispatcher {
     private static final transient Log LOG = LogFactory.getLog(RouteboxDispatcher.class);
     private ProducerTemplate producer;
@@ -51,11 +50,11 @@ public class RouteboxDispatcher {
     }
 
     public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
-        URI dispatchUri = null;
-        Exchange reply = null;
+        URI dispatchUri;
+        Exchange reply;
         
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+            LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri());
         }
         
         dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -63,18 +62,18 @@ public class RouteboxDispatcher {
         if (exchange.getPattern() == ExchangePattern.InOnly) {
             reply = producer.send(dispatchUri.toASCIIString(), exchange);
         } else {
-            reply = (Exchange) issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());        
+            reply = issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());
         }
 
         return reply;
     }
     
     public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
-        URI dispatchUri = null;
-        Exchange reply = null;
+        URI dispatchUri;
+        Exchange reply;
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Dispatching exchange" + exchange + "to endpoint " + endpoint.getEndpointUri());
+            LOG.debug("Dispatching exchange " + exchange + " to endpoint " + endpoint.getEndpointUri());
         }
         
         dispatchUri = selectDispatchUri(endpoint, exchange);
@@ -91,26 +90,27 @@ public class RouteboxDispatcher {
     }
     
     protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
-        URI dispatchUri = null;
+        URI dispatchUri;
         
         List<URI> consumerUris = getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
         if (consumerUris.isEmpty()) {
-            throw new CamelException("No routes found for dispatch in Routebox");
+            throw new CamelExchangeException("No routes found to dispatch in Routebox at " + endpoint, exchange);
         } else if (consumerUris.size() == 1) {
             dispatchUri = consumerUris.get(0);
         } else {
             if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
-                //apply URI string found in dispatch Map
-                if (endpoint.getConfig().getDispatchMap().containsKey(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"))) {             
-                    dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(exchange.getIn().getHeader("ROUTE_DISPATCH_KEY")));
+                // apply URI string found in dispatch Map
+                String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY", String.class);
+                if (endpoint.getConfig().getDispatchMap().containsKey(key)) {
+                    dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(key));
                 } else {
-                    throw new CamelException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + exchange.getIn().getHeader("ROUTE_DISPATCH_KEY"));
+                    throw new CamelExchangeException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange);
                 }
             } else {
-                //apply dispatch strategy
+                // apply dispatch strategy
                 dispatchUri = endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, exchange);
                 if (dispatchUri == null) {
-                    throw new CamelException("No matching inner routes found for Operation");
+                    throw new CamelExchangeException("No matching inner routes found for Operation", exchange);
                 }
             }
         }
@@ -138,9 +138,7 @@ public class RouteboxDispatcher {
         Exchange exchange = producer.send(endpoint, pattern, new Processor() {
             public void process(Exchange exchange) throws Exception {
                 Message in = exchange.getIn();
-                for (Map.Entry<String, Object> header : headers.entrySet()) {
-                    in.setHeader(header.getKey(), header.getValue());
-                }
+                in.getHeaders().putAll(headers);
                 in.setBody(body);
             }
         });