You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/27 11:30:57 UTC

svn commit: r522838 [1/2] - in /activemq/camel/trunk: camel-core/ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/component/pojo/ camel-core/src/main/java/org/apach...

Author: jstrachan
Date: Tue Mar 27 02:30:52 2007
New Revision: 522838

URL: http://svn.apache.org/viewvc?view=rev&rev=522838
Log:
added a lifecycle interface (Service) and added a Producer and Consumer interfaces for actually communicating with an endpoint (so that you can add multiple consumers, or deal with resource specific stuff like JDBC connections etc)

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java   (with props)
    activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java   (with props)
    activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/pom.xml
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
    activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java
    activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
    activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
    activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java

Modified: activemq/camel/trunk/camel-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/pom.xml?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/pom.xml (original)
+++ activemq/camel/trunk/camel-core/pom.xml Tue Mar 27 02:30:52 2007
@@ -83,6 +83,19 @@
         </executions>
       </plugin>
 
+           <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <includes>
+            <include>**/*Test.*</include>
+          </includes>
+          <excludes>
+            <exclude>**/PojoRouteTest.*</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Tue Mar 27 02:30:52 2007
@@ -19,20 +19,19 @@
 
 import org.apache.camel.builder.RouteBuilder;
 
-import java.util.List;
-import java.util.Map;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Callable;
 
 /**
- * Interface used to represent the context used to configure routes and the 
+ * Interface used to represent the context used to configure routes and the
  * policies to use during message exchanges between endpoints.
  *
  * @version $Revision$
  */
 public interface CamelContext {
-    
-	// Component Management Methods
+
+    // Component Management Methods
     //-----------------------------------------------------------------------
 
     /**
@@ -41,9 +40,10 @@
     public void addComponent(String componentName, Component component);
 
     public Component getComponent(String componentName);
-    
+
     /**
      * Removes a previously added component.
+     *
      * @param componentName
      * @return the previously added component or null if it had not been previously added.
      */
@@ -51,14 +51,14 @@
 
     /**
      * Gets the a previously added component by name or lazily creates the component
-     * using the factory Callback. 
-     * 
+     * using the factory Callback.
+     *
      * @param componentName
-     * @param factory used to create a new component instance if the component was not previously added.
+     * @param factory       used to create a new component instance if the component was not previously added.
      * @return
      */
     public Component getOrCreateComponent(String componentName, Callable<Component> factory);
-    
+
     // Endpoint Management Methods
     //-----------------------------------------------------------------------
 
@@ -66,41 +66,39 @@
      * Resolves the given URI to an endpoint
      */
     public Endpoint resolveEndpoint(String uri);
-    
+
     /**
      * Activates all the starting endpoints in that were added as routes.
      */
     public void activateEndpoints() throws Exception;
-    
+
     /**
      * Deactivates all the starting endpoints in that were added as routes.
      */
-    public void deactivateEndpoints() ;
-
+    public void deactivateEndpoints() throws Exception;
 
     /**
      * Returns the collection of all active endpoints currently registered
      */
     Collection<Endpoint> getEndpoints();
-    
+
     // Route Management Methods
     //-----------------------------------------------------------------------
-	public List<Route> getRoutes() ;
-	
-	public void setRoutes(List<Route> routes);
-    public void setRoutes(RouteBuilder builder);
-    public void setRoutes(RouteFactory factory);
-
-	public void addRoutes(List<Route> routes);
-    public void addRoutes(RouteBuilder builder);
-    public void addRoutes(RouteFactory factory);
+    public List<Route> getRoutes();
+
+    public void setRoutes(List<Route> routes);
+
+    public void addRoutes(List<Route> routes);
+
+    public void addRoutes(RouteBuilder builder) throws Exception;
+
+    public void addRoutes(RouteFactory factory) throws Exception;
 
     // Properties
     //-----------------------------------------------------------------------
     public EndpointResolver getEndpointResolver();
-    
+
     public ExchangeConverter getExchangeConverter();
 
     public TypeConverter getTypeConverter();
-
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,26 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents a consumer of an endpoint
+ *
+ * @version $Revision$
+ */
+public interface Consumer<E extends Exchange> extends Service {
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Tue Mar 27 02:30:52 2007
@@ -23,17 +23,12 @@
  *
  * @version $Revision$
  */
-public interface Endpoint<E> extends Processor<E> {
+public interface Endpoint<E extends Exchange> {
 
     /**
      * Returns the string representation of the endpoint URI
      */
     public String getEndpointUri();
-
-    /**
-     * Sends an outbound exchange to the endpoint
-     */
-    void onExchange(E exchange);
     
     /**
      * Create a new exchange for communicating with this endpoint
@@ -69,4 +64,18 @@
      * @return the context which created the endpoint
      */
     CamelContext getContext();
+
+    /**
+     * Creates a new producer which is used send messages into the endpoint
+     *
+     * @return a newly created producer
+     */
+    Producer<E> createProducer() throws Exception;
+
+    /**
+     * Creates a new consumer which consumes messages from the endpoint using the given processor
+     *
+     * @return a newly created consumer
+     */
+    Consumer<E> createConsumer(Processor<E> processor) throws Exception;
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java Tue Mar 27 02:30:52 2007
@@ -21,7 +21,7 @@
  *
  * @version $Revision$
  */
-public interface EndpointResolver<E> {
+public interface EndpointResolver<E extends Exchange> {
 
     /**
      * Resolves the component for a given uri or returns null if now component handles it.

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * @version $Revision$
+ */
+public class FailedToCreateProducerException extends RuntimeCamelException {
+    private final Endpoint endpoint;
+
+    public FailedToCreateProducerException(Endpoint endpoint, Throwable cause) {
+        super("Failed to create Producer for endpoint: " + endpoint + ". Reason: "+ cause, cause);
+        this.endpoint = endpoint;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Provides a channel on which clients can create and invoke exchanges on the endpoint
+ *
+ * @version $Revision$
+ */
+public interface Producer<E extends Exchange> extends Processor<E>, Service {
+    Endpoint<E> getEndpoint();
+
+    /**
+     * Creates a new exchange to send to this endpoint
+     *
+     * @return a newly created exchange
+     */
+    E createExchange();
+
+    /**
+     * Creates a new exchange for communicating with this exchange using the given exchange to pre-populate the values
+     * of the headers and messages
+     */
+    E createExchange(E exchange);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents the core lifecycle API for POJOs which can be started and stopped
+ *
+ * @version $Revision$
+ */
+public interface Service {
+
+    void start() throws Exception;
+
+    void stop() throws Exception;
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Tue Mar 27 02:30:52 2007
@@ -24,6 +24,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.processor.LoggingLevel;
+import org.apache.camel.processor.SendProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -184,7 +185,7 @@
 
     @Fluent
     public DeadLetterChannelBuilder<E> deadLetterChannel(@FluentArg("endpoint") Endpoint<E> deadLetterEndpoint) {
-        return new DeadLetterChannelBuilder<E>(deadLetterEndpoint);
+        return new DeadLetterChannelBuilder<E>(new SendProcessor<E>(deadLetterEndpoint));
     }
 
     // Properties

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java Tue Mar 27 02:30:52 2007
@@ -68,7 +68,7 @@
     }
 
     @Override
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
         List<FilterProcessor<E>> filters = new ArrayList<FilterProcessor<E>>();
         for (WhenBuilder<E> predicateBuilder : predicateBuilders) {
             filters.add(predicateBuilder.createProcessor());

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Mar 27 02:30:52 2007
@@ -53,7 +53,7 @@
         return answer;
     }
 
-    public Processor<E> createErrorHandler(Processor<E> processor) {
+    public Processor<E> createErrorHandler(Processor<E> processor) throws Exception {
         Processor<E> deadLetter = getDeadLetterFactory().createProcessor();
         return new DeadLetterChannel<E>(processor, deadLetter, getRedeliveryPolicy());
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java Tue Mar 27 02:30:52 2007
@@ -32,5 +32,5 @@
     /**
      * Creates the error handler interceptor
      */
-    Processor<E> createErrorHandler(Processor<E> processor);
+    Processor<E> createErrorHandler(Processor<E> processor) throws Exception;
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java Tue Mar 27 02:30:52 2007
@@ -52,7 +52,7 @@
         return predicate;
     }
 
-    public FilterProcessor<E> createProcessor() {
+    public FilterProcessor<E> createProcessor() throws Exception {
         // lets create a single processor for all child predicates
         Processor<E> childProcessor = super.createProcessor();
         return new FilterProcessor<E>(predicate, childProcessor);

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Tue Mar 27 02:30:52 2007
@@ -248,7 +248,7 @@
         processors.add(processor);
     }
 
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
         List<Processor<E>> answer = new ArrayList<Processor<E>>();
 
         for (ProcessorFactory<E> processFactory : processFactories) {
@@ -272,7 +272,7 @@
     /**
      * Creates the processor and wraps it in any necessary interceptors and error handlers
      */
-    protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) {
+    protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) throws Exception {
         Processor<E> processor = processFactory.createProcessor();
         return getErrorHandlerBuilder().createErrorHandler(processor);
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java Tue Mar 27 02:30:52 2007
@@ -49,7 +49,7 @@
         return target;
     }
 
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
     	
     	// The target is required.
     	if( target == null ) 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java Tue Mar 27 02:30:52 2007
@@ -38,7 +38,7 @@
     }
 
     @Override
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
         return new MulticastProcessor<E>(endpoints);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java Tue Mar 27 02:30:52 2007
@@ -38,7 +38,7 @@
     }
 
     @Override
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
         return new Pipeline<E>(endpoints);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java Tue Mar 27 02:30:52 2007
@@ -26,6 +26,6 @@
  */
 public interface ProcessorFactory<E extends Exchange> {
 
-    public Processor<E> createProcessor();
+    public Processor<E> createProcessor() throws Exception;
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Tue Mar 27 02:30:52 2007
@@ -99,29 +99,29 @@
     /**
      * Returns the routing map from inbound endpoints to processors
      */
-    public List<Route<E>> getRouteList() {
+    public List<Route<E>> getRouteList() throws Exception {
         checkInitialized();
         return routes;
     }
 
     /**
-     * Returns the destinationBuilders which have been created
+     * Returns the builders which have been created
      */
-    public List<FromBuilder<E>> getDestinationBuilders() {
+    public List<FromBuilder<E>> getFromBuilders() throws Exception {
         checkInitialized();
         return fromBuilders;
     }
 
     // Implementation methods
     //-----------------------------------------------------------------------
-    protected void checkInitialized() {
+    protected void checkInitialized() throws Exception {
         if (initalized.compareAndSet(false, true)) {
             configure();
             populateRoutes(routes);
         }
     }
 
-    protected void populateRoutes(List<Route<E>> routes) {
+    protected void populateRoutes(List<Route<E>> routes) throws Exception {
         for (FromBuilder<E> builder : fromBuilders) {
             Endpoint<E> from = builder.getFrom();
             Processor<E> processor = makeProcessor(from, builder);
@@ -140,7 +140,7 @@
      * @param builder the builder which is the factory of the processor
      * @return
      */
-    protected Processor<E> makeProcessor(Endpoint<E> from, FromBuilder<E> builder) {
+    protected Processor<E> makeProcessor(Endpoint<E> from, FromBuilder<E> builder) throws Exception {
         return builder.createProcessor();
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java Tue Mar 27 02:30:52 2007
@@ -36,7 +36,7 @@
         this.valueBuilder = valueBuilder;
     }
 
-    public Processor<E> createProcessor() {
+    public Processor<E> createProcessor() throws Exception {
         // lets create a single processor for all child predicates
         Processor<E> destination = super.createProcessor();
         Expression<E> expression = valueBuilder.getExpression();

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pojo;
+
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * @version $Revision$
+ */
+public class PojoConsumer extends DefaultConsumer<PojoExchange> {
+    private final Object pojo;
+
+    public PojoConsumer(Endpoint<PojoExchange> endpoint, Processor<PojoExchange> processor, Object pojo) {
+        super(endpoint, processor);
+        this.pojo = pojo;
+    }
+
+
+    /**
+     * Creates a Proxy object that can be used to deliver inbound PojoExchanges.
+     *
+     * @param interfaces
+     * @return
+     */
+    public Object createInboundProxy(Class interfaces[]) {
+        return Proxy.newProxyInstance(pojo.getClass().getClassLoader(), interfaces, new InvocationHandler() {
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                if (!isStarted()) {
+                    throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
+                }
+                PojoInvocation invocation = new PojoInvocation(proxy, method, args);
+                PojoExchange exchange = getEndpoint().createExchange();
+                exchange.setInvocation(invocation);
+                getProcessor().onExchange(exchange);
+                Throwable fault = exchange.getException();
+                if (fault != null) {
+                    throw new InvocationTargetException(fault);
+                }
+                return exchange.getOut().getBody();
+            }
+        });
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java Tue Mar 27 02:30:52 2007
@@ -17,7 +17,11 @@
 package org.apache.camel.component.pojo;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -40,10 +44,22 @@
         this.pojo = pojo;
     }
 
+    public Producer<PojoExchange> createProducer() throws Exception {
+        return startService(new DefaultProducer<PojoExchange>(this) {
+            public void onExchange(PojoExchange exchange) {
+                invoke(exchange);
+            }
+        });
+    }
+
+    public Consumer<PojoExchange> createConsumer(Processor<PojoExchange> processor) throws Exception {
+        return startService(new PojoConsumer(this, processor, pojo));
+    }
+
     /**
      * This causes us to invoke the endpoint Pojo using reflection.
      */
-    public void onExchange(PojoExchange exchange) {
+    public void invoke(PojoExchange exchange) {
         PojoInvocation invocation = exchange.getInvocation();
         try {
             Object response = invocation.getMethod().invoke(pojo, invocation.getArgs());
@@ -74,28 +90,4 @@
         component.unregisterActivation(getEndpointUri());
     }
 
-    /**
-     * Creates a Proxy object that can be used to deliver inbound PojoExchanges.
-     *
-     * @param interfaces
-     * @return
-     */
-    public Object createInboundProxy(Class interfaces[]) {
-        return Proxy.newProxyInstance(pojo.getClass().getClassLoader(), interfaces, new InvocationHandler() {
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                if (!activated.get()) {
-                    throw new IllegalStateException("The endpoint is not active: " + getEndpointUri());
-                }
-                PojoInvocation invocation = new PojoInvocation(proxy, method, args);
-                PojoExchange exchange = createExchange();
-                exchange.setInvocation(invocation);
-                getInboundProcessor().onExchange(exchange);
-                Throwable fault = exchange.getException();
-                if (fault != null) {
-                    throw new InvocationTargetException(fault);
-                }
-                return exchange.getOut().getBody();
-            }
-        });
-    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Tue Mar 27 02:30:52 2007
@@ -19,13 +19,13 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Consumer;
+import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultProducer;
 
-import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Represents a queue endpoint that uses a {@link BlockingQueue}
@@ -36,20 +36,22 @@
  */
 public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
     private BlockingQueue<E> queue;
-	private org.apache.camel.component.queue.QueueEndpoint.Activation activation;
 
     public QueueEndpoint(String uri, CamelContext container, BlockingQueue<E> queue) {
         super(uri, container);
         this.queue = queue;
     }
 
-    public void onExchange(E exchange) {
-        queue.add(exchange);
+    public Producer<E> createProducer() throws Exception {
+        return startService(new DefaultProducer<E>(this) {
+            public void onExchange(E exchange) {
+                queue.add(exchange);
+            }
+        });
     }
 
-    public void setInboundProcessor(Processor<E> processor) {
-        // TODO lets start a thread to process inbound requests
-        // if we don't already have one
+    public Consumer<E> createConsumer(Processor<E> processor) throws Exception {
+        return startService(new QueueEndpointConsumer<E>(this, processor));
     }
 
     public E createExchange() {
@@ -61,59 +63,5 @@
     public BlockingQueue<E> getQueue() {
         return queue;
     }
-    
-    class Activation implements Runnable {
-		AtomicBoolean stop = new AtomicBoolean();
-		private Thread thread;
-		
-		public void run() {
-			while(!stop.get()) {
-				E exchange=null;
-				try {
-					exchange = queue.poll(100, TimeUnit.MILLISECONDS);
-				} catch (InterruptedException e) {
-					break;
-				}
-				if( exchange !=null ) {
-					try {
-						getInboundProcessor().onExchange(exchange);
-					} catch (Throwable e) {
-						e.printStackTrace();
-					}
-				}
-			}
-		}
-
-		public void start() {
-			thread = new Thread(this, getEndpointUri());
-			thread.setDaemon(true);
-			thread.start();
-		}
-
-		public void stop() throws InterruptedException {
-			stop.set(true);
-			thread.join();
-		}
-		
-		@Override
-		public String toString() {
-			return "Activation: "+getEndpointUri();
-		}
-    }
 
-    @Override
-    protected void doActivate() {
-		activation = new Activation();
-		activation.start();
-    }
-    
-    @Override
-    protected void doDeactivate() {
-		try {
-			activation.stop();
-			activation=null;
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-    }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.queue;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Revision$
+ */
+public class QueueEndpointConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable {
+    private QueueEndpoint<E> endpoint;
+    private Processor<E> processor;
+    private Thread thread;
+
+    public QueueEndpointConsumer(QueueEndpoint<E> endpoint, Processor<E> processor) {
+        this.endpoint = endpoint;
+        this.processor = processor;
+    }
+
+    @Override
+    public String toString() {
+        return "QueueEndpointConsumer: " + endpoint.getEndpointUri();
+    }
+
+    public void run() {
+        while (!isStopping()) {
+            E exchange;
+            try {
+                exchange = endpoint.getQueue().poll(100, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e) {
+                break;
+            }
+            if (exchange != null) {
+                try {
+                    processor.onExchange(exchange);
+                }
+                catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    protected void doStart() throws Exception {
+        thread = new Thread(this, endpoint.getEndpointUri());
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    protected void doStop() throws Exception {
+        thread.join();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Mar 27 02:30:52 2007
@@ -18,7 +18,10 @@
 package org.apache.camel.impl;
 
 import org.apache.camel.*;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,6 +30,7 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Represents the context used to configure routes and the policies to use.
@@ -34,14 +38,17 @@
  * @version $Revision: 520517 $
  * @org.apache.xbean.XBean element="container" rootElement="true"
  */
-public class DefaultCamelContext implements CamelContext {
+public class DefaultCamelContext implements CamelContext, Service {
+    private static final transient Log log = LogFactory.getLog(DefaultCamelContext.class);
     private Map<String, Endpoint> endpoints = new HashMap<String, Endpoint>();
     private Map<String, Component> components = new HashMap<String, Component>();
     private List<EndpointResolver> resolvers = new CopyOnWriteArrayList<EndpointResolver>();
     private List<Route> routes;
+    private List<Service> servicesToClose = new ArrayList<Service>();
     private TypeConverter typeConverter;
     private EndpointResolver endpointResolver;
     private ExchangeConverter exchangeConverter;
+    private AtomicBoolean started = new AtomicBoolean(false);
 
     /**
      * Adds a component to the container.
@@ -107,6 +114,16 @@
 
     // Endpoint Management Methods
     //-----------------------------------------------------------------------
+    public void start() throws Exception {
+        activateEndpoints();
+    }
+
+    public void stop() throws Exception {
+        deactivateEndpoints();
+    }
+
+    // Endpoint Management Methods
+    //-----------------------------------------------------------------------
 
     public Collection<Endpoint> getEndpoints() {
         synchronized (endpoints) {
@@ -161,17 +178,26 @@
      */
     public void activateEndpoints() throws Exception {
         for (Route<Exchange> route : routes) {
-            route.getEndpoint().activate(route.getProcessor());
+            Processor<Exchange> processor = route.getProcessor();
+            Consumer<Exchange> consumer = route.getEndpoint().createConsumer(processor);
+            if (consumer != null) {
+                consumer.start();
+                servicesToClose.add(consumer);
+            }
+            if (processor instanceof Service) {
+                Service service = (Service) processor;
+                service.start();
+                servicesToClose.add(service);
+            }
         }
     }
 
     /**
      * Deactivates all the starting endpoints in that were added as routes.
      */
-    public void deactivateEndpoints() {
-        for (Route<Exchange> route : routes) {
-            route.getEndpoint().deactivate();
-        }
+    public void deactivateEndpoints() throws Exception {
+        ServiceHelper.stopServices(servicesToClose);
+
     }
 
     // Route Management Methods
@@ -184,21 +210,6 @@
         this.routes = routes;
     }
 
-    public void setRoutes(RouteBuilder builder) {
-        // lets now add the routes from the builder
-        builder.setContext(this);
-        setRoutes(builder.getRouteList());
-    }
-
-    public void setRoutes(final RouteFactory factory) {
-        RouteBuilder builder = new RouteBuilder(this) {
-            public void configure() {
-                factory.build(this);
-            }
-        };
-        setRoutes(builder);
-    }
-
     public void addRoutes(List<Route> routes) {
         if (this.routes == null) {
             this.routes = new ArrayList<Route>(routes);
@@ -208,13 +219,13 @@
         }
     }
 
-    public void addRoutes(RouteBuilder builder) {
+    public void addRoutes(RouteBuilder builder) throws Exception {
         // lets now add the routes from the builder
         builder.setContext(this);
         addRoutes(builder.getRouteList());
     }
 
-    public void addRoutes(final RouteFactory factory) {
+    public void addRoutes(final RouteFactory factory) throws Exception {
         RouteBuilder builder = new RouteBuilder(this) {
             public void configure() {
                 factory.build(this);

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E> {
+    private Endpoint<E> endpoint;
+    private Processor<E> processor;
+
+    public DefaultConsumer(Endpoint<E> endpoint, Processor<E> processor) {
+        this.endpoint = endpoint;
+        this.processor = processor;
+    }
+
+    public Endpoint<E> getEndpoint() {
+        return endpoint;
+    }
+
+    public Processor<E> getProcessor() {
+        return processor;
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processor);
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(processor);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Tue Mar 27 02:30:52 2007
@@ -20,6 +20,9 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Service;
 import org.apache.camel.util.ObjectHelper;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,5 +128,10 @@
      * Called at most once by the container to deactivate the endpoint
      */
     protected void doDeactivate() {
+    }
+
+    protected <T extends Service> T startService(T service) throws Exception {
+        service.start();
+        return service;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java Tue Mar 27 02:30:52 2007
@@ -20,6 +20,7 @@
 import org.apache.camel.Component;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointResolver;
+import org.apache.camel.Exchange;
 import org.apache.camel.util.FactoryFinder;
 import org.apache.camel.util.ObjectHelper;
 
@@ -34,7 +35,7 @@
  *
  * @version $Revision$
  */
-public class DefaultEndpointResolver<E> implements EndpointResolver<E> {
+public class DefaultEndpointResolver<E extends Exchange> implements EndpointResolver<E> {
     static final private FactoryFinder endpointResolverFactory = new FactoryFinder("META-INF/services/org/apache/camel/EndpointResolver/");
     
     public Endpoint<E> resolveEndpoint(CamelContext container, String uri) throws Exception {

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+
+/**
+ * A default implementation of @{link Producer} for implementation inheritence
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultProducer<E extends Exchange> extends ServiceSupport implements Producer<E> {
+    private Endpoint<E> endpoint;
+
+    public DefaultProducer(Endpoint<E> endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public Endpoint<E> getEndpoint() {
+        return endpoint;
+    }
+
+    public E createExchange() {
+        return endpoint.createExchange();
+    }
+
+    public E createExchange(E exchange) {
+        return endpoint.createExchange(exchange);
+    }
+
+    protected void doStart() throws Exception {
+    }
+
+    protected void doStop() throws Exception {
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A useful base class which ensures that a service is only initialized once and provides some helper methods for
+ * enquiring of its status
+ * 
+ * @version $Revision$
+ */
+public abstract class ServiceSupport implements Service {
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean stopping = new AtomicBoolean(false);
+    private AtomicBoolean stopped = new AtomicBoolean(false);
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            doStart();
+        }
+    }
+
+    public void stop() throws Exception {
+        if (stopped.compareAndSet(false, true)) {
+            stopping.set(true);
+            try {
+                doStop();
+            }
+            finally {
+                stopped.set(true);
+                started.set(false);
+                stopping.set(false);
+            }
+        }
+    }
+
+    /**
+     * @return true if this service has been started
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+
+    /**
+     * @return true if this service is in the process of closing
+     */
+    public boolean isStopping() {
+        return stopping.get();
+    }
+
+
+    /**
+     * @return true if this service is closed
+     */
+    public boolean isStopped() {
+        return stopped.get();
+    }
+
+    protected abstract void doStart() throws Exception;
+    
+    protected abstract void doStop() throws Exception;
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,6 +18,8 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 
 import java.util.Collection;
 
@@ -26,7 +28,7 @@
  *
  * @version $Revision$
  */
-public class CompositeProcessor<E> implements Processor<E> {
+public class CompositeProcessor<E> extends ServiceSupport implements Processor<E> {
     private final Collection<Processor<E>> processors;
 
     public CompositeProcessor(Collection<Processor<E>> processors) {
@@ -58,5 +60,13 @@
 
     public Collection<Processor<E>> getProcessors() {
         return processors;
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(processors);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processors);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Mar 27 02:30:52 2007
@@ -19,6 +19,8 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -29,9 +31,8 @@
  *
  * @version $Revision$
  */
-public class DeadLetterChannel<E extends Exchange> implements ErrorHandler<E> {
+public class DeadLetterChannel<E extends Exchange> extends ServiceSupport implements ErrorHandler<E> {
     public static final String REDELIVERY_COUNT_HEADER = "org.apache.camel.redeliveryCount";
-
     private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
     private Processor<E> output;
     private Processor<E> deadLetter;
@@ -79,7 +80,6 @@
         deadLetter.onExchange(exchange);
     }
 
-
     // Properties
     //-------------------------------------------------------------------------
 
@@ -116,7 +116,7 @@
      * Sets the message header name to be used to append the redelivery count value when a message has been redelivered
      *
      * @param redeliveryCountHeader the header name to use to append the redelivery count or null if you wish to disable
-     * this feature
+     *                              this feature
      */
     public void setRedeliveryCountHeader(String redeliveryCountHeader) {
         this.redeliveryCountHeader = redeliveryCountHeader;
@@ -133,9 +133,9 @@
 
     protected void sleep(long redeliveryDelay) {
         if (redeliveryDelay > 0) {
-        if (log.isDebugEnabled()) {
-            log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
-        }
+            if (log.isDebugEnabled()) {
+                log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
+            }
             try {
                 Thread.sleep(redeliveryDelay);
             }
@@ -145,5 +145,13 @@
                 }
             }
         }
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(output, deadLetter);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(deadLetter, output);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java Tue Mar 27 02:30:52 2007
@@ -19,11 +19,13 @@
 
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * @version $Revision$
  */
-public class FilterProcessor<E> implements Processor<E> {
+public class FilterProcessor<E> extends ServiceSupport implements Processor<E> {
     private Predicate<E> predicate;
     private Processor<E> processor;
 
@@ -49,5 +51,13 @@
 
     public Processor<E> getProcessor() {
         return processor;
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(processor);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(processor);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,21 +18,22 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * @version $Revision: 519941 $
  */
-public class InterceptorProcessor<E> implements Processor<E> {
-
+public class InterceptorProcessor<E> extends ServiceSupport implements Processor<E> {
     protected Processor<E> next;
 
-	public InterceptorProcessor() {
+    public InterceptorProcessor() {
     }
 
     public void onExchange(E exchange) {
-       if( next != null ) {
-    	   next.onExchange(exchange);
-       }
+        if (next != null) {
+            next.onExchange(exchange);
+        }
     }
 
     @Override
@@ -40,10 +41,19 @@
         return "intercept(" + next + ")";
     }
 
-	public Processor<E> getNext() {
-		return next;
-	}
-	public void setNext(Processor<E> next) {
-		this.next = next;
-	}
+    public Processor<E> getNext() {
+        return next;
+    }
+
+    public void setNext(Processor<E> next) {
+        this.next = next;
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(next);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(next);
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java Tue Mar 27 02:30:52 2007
@@ -19,6 +19,8 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -27,7 +29,7 @@
  *
  * @version $Revision$
  */
-public class LoggingErrorHandler<E extends Exchange> implements ErrorHandler<E> {
+public class LoggingErrorHandler<E extends Exchange> extends ServiceSupport implements ErrorHandler<E> {
     private Processor<E> output;
     private Log log;
     private LoggingLevel level;
@@ -123,5 +125,13 @@
 
     protected Object logMessage(E exchange, RuntimeException e) {
         return e + " while processing exchange: " + exchange;
+    }
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startServices(output);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(output);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java Tue Mar 27 02:30:52 2007
@@ -21,7 +21,7 @@
  * Used to configure the logging levels
  *
  * @version $Revision$
-*/
+ */
 public enum LoggingLevel {
     DEBUG, ERROR, FATAL, INFO, TRACE, WARN;
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 27 02:30:52 2007
@@ -20,7 +20,10 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ServiceSupport;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -29,40 +32,74 @@
  *
  * @version $Revision$
  */
-public class MulticastProcessor<E extends Exchange> implements Processor<E> {
-    private Collection<Endpoint<E>> endpoints;
+public class MulticastProcessor<E extends Exchange> extends ServiceSupport implements Processor<E> {
+    private Collection<Producer<E>> producers;
 
-    public MulticastProcessor(Collection<Endpoint<E>> endpoints) {
-        this.endpoints = endpoints;
+    /**
+     * A helper method to convert a list of endpoints into a list of processors
+     */
+    public static <E extends Exchange> Collection<Producer<E>> toProducers(Collection<Endpoint<E>> endpoints) throws Exception {
+        Collection<Producer<E>> answer = new ArrayList<Producer<E>>();
+        for (Endpoint<E> endpoint : endpoints) {
+            answer.add(endpoint.createProducer());
+        }
+        return answer;
+    }
+
+    public MulticastProcessor(Collection<Endpoint<E>> endpoints) throws Exception {
+        this.producers = toProducers(endpoints);
     }
 
     @Override
     public String toString() {
-        return "Multicast" + endpoints;
+        return "Multicast" + getEndpoints();
     }
 
     public void onExchange(E exchange) {
-        for (Endpoint<E> endpoint : endpoints) {
-            E copy = copyExchangeStrategy(endpoint, exchange);
-            endpoint.onExchange(copy);
+        for (Producer<E> producer : producers) {
+            E copy = copyExchangeStrategy(producer, exchange);
+            producer.onExchange(copy);
+        }
+    }
+
+    protected void doStop() throws Exception {
+        for (Producer<E> producer : producers) {
+            producer.stop();
+        }
+    }
+
+    protected void doStart() throws Exception {
+        for (Producer<E> producer : producers) {
+            producer.start();
         }
     }
 
     /**
-     * Returns the endpoints to multicast to
+     * Returns the producers to multicast to
+     */
+    public Collection<Producer<E>> getProducers() {
+        return producers;
+    }
+
+    /**
+     * Returns the list of endpoints
      */
     public Collection<Endpoint<E>> getEndpoints() {
-        return endpoints;
+        Collection<Endpoint<E>> answer = new ArrayList<Endpoint<E>>();
+        for (Producer<E> producer : producers) {
+            answer.add(producer.getEndpoint());
+        }
+        return answer;
     }
 
     /**
      * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
      * {@link Pipeline} will not clone the exchange
      *
-     * @param endpoint the endpoint that the exchange will be sent to
+     * @param producer the producer that will send the exchange
      * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
      */
-    protected E copyExchangeStrategy(Endpoint<E> endpoint, E exchange) {
-        return endpoint.createExchange(exchange);
+    protected E copyExchangeStrategy(Producer<E> producer, E exchange) {
+        return producer.createExchange(exchange);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue Mar 27 02:30:52 2007
@@ -20,45 +20,44 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 
 import java.util.Collection;
 
 /**
  * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working
  * with request/response message exchanges.
- *  
+ *
  * @version $Revision$
  */
-public class Pipeline<E extends Exchange> implements Processor<E> {
-    private Collection<Endpoint<E>> endpoints;
-
-    public Pipeline(Collection<Endpoint<E>> endpoints) {
-        this.endpoints = endpoints;
+public class Pipeline<E extends Exchange> extends MulticastProcessor<E> implements Processor<E> {
+    public Pipeline(Collection<Endpoint<E>> endpoints) throws Exception {
+        super(endpoints);
     }
 
     public void onExchange(E exchange) {
         E nextExchange = exchange;
         boolean first = true;
-        for (Endpoint<E> endpoint : endpoints) {
+        for (Producer<E> producer : getProducers()) {
             if (first) {
                 first = false;
             }
             else {
-                nextExchange = createNextExchange(endpoint, nextExchange);
+                nextExchange = createNextExchange(producer, nextExchange);
             }
-            endpoint.onExchange(nextExchange);
+            producer.onExchange(nextExchange);
         }
     }
 
     /**
      * Strategy method to create the next exchange from the
      *
-     * @param endpoint the endpoint the exchange will be sent to
+     * @param producer         the producer used to send to the endpoint
      * @param previousExchange the previous exchange
      * @return a new exchange
      */
-    protected E createNextExchange(Endpoint<E> endpoint, E previousExchange) {
-        E answer = endpoint.createExchange(previousExchange);
+    protected E createNextExchange(Producer<E> producer, E previousExchange) {
+        E answer = producer.createExchange(previousExchange);
 
         // now lets set the input of the next exchange to the output of the previous message if it is not null
         Object output = previousExchange.getOut().getBody();
@@ -81,6 +80,6 @@
 
     @Override
     public String toString() {
-        return "Pipeline" + endpoints;
+        return "Pipeline" + getEndpoints();
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue Mar 27 02:30:52 2007
@@ -21,9 +21,11 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import static org.apache.camel.util.ObjectHelper.notNull;
+import org.apache.camel.util.ProducerCache;
 
 import java.util.Iterator;
 
@@ -33,8 +35,9 @@
  *
  * @version $Revision$
  */
-public class RecipientList<E extends Exchange> implements Processor<E> {
+public class RecipientList<E extends Exchange> extends ServiceSupport implements Processor<E> {
     private final Expression<E> expression;
+    private ProducerCache<E> producerCache = new ProducerCache<E>();
 
     public RecipientList(Expression<E> expression) {
         notNull(expression, "expression");
@@ -52,11 +55,18 @@
         while (iter.hasNext()) {
             Object recipient = iter.next();
             Endpoint<E> endpoint = resolveEndpoint(exchange, recipient);
-            endpoint.onExchange(exchange);
+            producerCache.getProducer(endpoint).onExchange(exchange);
         }
     }
 
     protected Endpoint<E> resolveEndpoint(E exchange, Object recipient) {
         return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
+    protected void doStop() throws Exception {
+        producerCache.stop();
+    }
+
+    protected void doStart() throws Exception {
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Tue Mar 27 02:30:52 2007
@@ -30,7 +30,6 @@
  */
 public class RedeliveryPolicy implements Cloneable, Serializable {
     protected static transient Random randomNumberGenerator;
-
     protected int maximumRedeliveries = 6;
     protected long initialRedeliveryDelay = 1000L;
     protected double backOffMultiplier = 2;
@@ -39,7 +38,6 @@
     protected double collisionAvoidanceFactor = 0.15d;
     protected boolean useCollisionAvoidance = false;
 
-
     public RedeliveryPolicy() {
     }
 
@@ -116,7 +114,6 @@
         setCollisionAvoidancePercent(collisionAvoidancePercent);
         return this;
     }
-       
 
     // Properties
     //-------------------------------------------------------------------------

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,20 +18,43 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Service;
+import org.apache.camel.impl.ServiceSupport;
 
 /**
  * @version $Revision$
  */
-public class SendProcessor<E> implements Processor<E> {
+public class SendProcessor<E extends Exchange> extends ServiceSupport implements Processor<E>, Service {
     private Endpoint<E> destination;
+    private Producer<E> producer;
 
     public SendProcessor(Endpoint<E> destination) {
         this.destination = destination;
     }
 
+    protected void doStop() throws Exception {
+        if (producer != null) {
+            try {
+                producer.stop();
+            }
+            finally {
+                producer = null;
+            }
+        }
+    }
+
+    protected void doStart() throws Exception {
+        this.producer = destination.createProducer();
+    }
+
     public void onExchange(E exchange) {
-        destination.onExchange(exchange);
+        if (producer == null) {
+            throw new IllegalStateException("No producer, this processor has not been started!");
+        }
+        producer.onExchange(exchange);
     }
 
     public Endpoint<E> getDestination() {