You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/27 11:30:57 UTC
svn commit: r522838 [1/2] - in /activemq/camel/trunk: camel-core/
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/pojo/
camel-core/src/main/java/org/apach...
Author: jstrachan
Date: Tue Mar 27 02:30:52 2007
New Revision: 522838
URL: http://svn.apache.org/viewvc?view=rev&rev=522838
Log:
added a lifecycle interface (Service) and added a Producer and Consumer interfaces for actually communicating with an endpoint (so that you can add multiple consumers, or deal with resource specific stuff like JDBC connections etc)
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (with props)
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (with props)
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (with props)
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (with props)
activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (with props)
activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (with props)
Modified:
activemq/camel/trunk/camel-core/pom.xml
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java
activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Modified: activemq/camel/trunk/camel-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/pom.xml?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/pom.xml (original)
+++ activemq/camel/trunk/camel-core/pom.xml Tue Mar 27 02:30:52 2007
@@ -83,6 +83,19 @@
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <includes>
+ <include>**/*Test.*</include>
+ </includes>
+ <excludes>
+ <exclude>**/PojoRouteTest.*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Tue Mar 27 02:30:52 2007
@@ -19,20 +19,19 @@
import org.apache.camel.builder.RouteBuilder;
-import java.util.List;
-import java.util.Map;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Callable;
/**
- * Interface used to represent the context used to configure routes and the
+ * Interface used to represent the context used to configure routes and the
* policies to use during message exchanges between endpoints.
*
* @version $Revision$
*/
public interface CamelContext {
-
- // Component Management Methods
+
+ // Component Management Methods
//-----------------------------------------------------------------------
/**
@@ -41,9 +40,10 @@
public void addComponent(String componentName, Component component);
public Component getComponent(String componentName);
-
+
/**
* Removes a previously added component.
+ *
* @param componentName
* @return the previously added component or null if it had not been previously added.
*/
@@ -51,14 +51,14 @@
/**
* Gets the a previously added component by name or lazily creates the component
- * using the factory Callback.
- *
+ * using the factory Callback.
+ *
* @param componentName
- * @param factory used to create a new component instance if the component was not previously added.
+ * @param factory used to create a new component instance if the component was not previously added.
* @return
*/
public Component getOrCreateComponent(String componentName, Callable<Component> factory);
-
+
// Endpoint Management Methods
//-----------------------------------------------------------------------
@@ -66,41 +66,39 @@
* Resolves the given URI to an endpoint
*/
public Endpoint resolveEndpoint(String uri);
-
+
/**
* Activates all the starting endpoints in that were added as routes.
*/
public void activateEndpoints() throws Exception;
-
+
/**
* Deactivates all the starting endpoints in that were added as routes.
*/
- public void deactivateEndpoints() ;
-
+ public void deactivateEndpoints() throws Exception;
/**
* Returns the collection of all active endpoints currently registered
*/
Collection<Endpoint> getEndpoints();
-
+
// Route Management Methods
//-----------------------------------------------------------------------
- public List<Route> getRoutes() ;
-
- public void setRoutes(List<Route> routes);
- public void setRoutes(RouteBuilder builder);
- public void setRoutes(RouteFactory factory);
-
- public void addRoutes(List<Route> routes);
- public void addRoutes(RouteBuilder builder);
- public void addRoutes(RouteFactory factory);
+ public List<Route> getRoutes();
+
+ public void setRoutes(List<Route> routes);
+
+ public void addRoutes(List<Route> routes);
+
+ public void addRoutes(RouteBuilder builder) throws Exception;
+
+ public void addRoutes(RouteFactory factory) throws Exception;
// Properties
//-----------------------------------------------------------------------
public EndpointResolver getEndpointResolver();
-
+
public ExchangeConverter getExchangeConverter();
public TypeConverter getTypeConverter();
-
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,26 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents a consumer of an endpoint
+ *
+ * @version $Revision$
+ */
+public interface Consumer<E extends Exchange> extends Service {
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Consumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Tue Mar 27 02:30:52 2007
@@ -23,17 +23,12 @@
*
* @version $Revision$
*/
-public interface Endpoint<E> extends Processor<E> {
+public interface Endpoint<E extends Exchange> {
/**
* Returns the string representation of the endpoint URI
*/
public String getEndpointUri();
-
- /**
- * Sends an outbound exchange to the endpoint
- */
- void onExchange(E exchange);
/**
* Create a new exchange for communicating with this endpoint
@@ -69,4 +64,18 @@
* @return the context which created the endpoint
*/
CamelContext getContext();
+
+ /**
+ * Creates a new producer which is used send messages into the endpoint
+ *
+ * @return a newly created producer
+ */
+ Producer<E> createProducer() throws Exception;
+
+ /**
+ * Creates a new consumer which consumes messages from the endpoint using the given processor
+ *
+ * @return a newly created consumer
+ */
+ Consumer<E> createConsumer(Processor<E> processor) throws Exception;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/EndpointResolver.java Tue Mar 27 02:30:52 2007
@@ -21,7 +21,7 @@
*
* @version $Revision$
*/
-public interface EndpointResolver<E> {
+public interface EndpointResolver<E extends Exchange> {
/**
* Resolves the component for a given uri or returns null if now component handles it.
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * @version $Revision$
+ */
+public class FailedToCreateProducerException extends RuntimeCamelException {
+ private final Endpoint endpoint;
+
+ public FailedToCreateProducerException(Endpoint endpoint, Throwable cause) {
+ super("Failed to create Producer for endpoint: " + endpoint + ". Reason: "+ cause, cause);
+ this.endpoint = endpoint;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/FailedToCreateProducerException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Provides a channel on which clients can create and invoke exchanges on the endpoint
+ *
+ * @version $Revision$
+ */
+public interface Producer<E extends Exchange> extends Processor<E>, Service {
+ Endpoint<E> getEndpoint();
+
+ /**
+ * Creates a new exchange to send to this endpoint
+ *
+ * @return a newly created exchange
+ */
+ E createExchange();
+
+ /**
+ * Creates a new exchange for communicating with this exchange using the given exchange to pre-populate the values
+ * of the headers and messages
+ */
+ E createExchange(E exchange);
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Represents the core lifecycle API for POJOs which can be started and stopped
+ *
+ * @version $Revision$
+ */
+public interface Service {
+
+ void start() throws Exception;
+
+ void stop() throws Exception;
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Service.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Tue Mar 27 02:30:52 2007
@@ -24,6 +24,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.processor.LoggingLevel;
+import org.apache.camel.processor.SendProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -184,7 +185,7 @@
@Fluent
public DeadLetterChannelBuilder<E> deadLetterChannel(@FluentArg("endpoint") Endpoint<E> deadLetterEndpoint) {
- return new DeadLetterChannelBuilder<E>(deadLetterEndpoint);
+ return new DeadLetterChannelBuilder<E>(new SendProcessor<E>(deadLetterEndpoint));
}
// Properties
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ChoiceBuilder.java Tue Mar 27 02:30:52 2007
@@ -68,7 +68,7 @@
}
@Override
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
List<FilterProcessor<E>> filters = new ArrayList<FilterProcessor<E>>();
for (WhenBuilder<E> predicateBuilder : predicateBuilders) {
filters.add(predicateBuilder.createProcessor());
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Mar 27 02:30:52 2007
@@ -53,7 +53,7 @@
return answer;
}
- public Processor<E> createErrorHandler(Processor<E> processor) {
+ public Processor<E> createErrorHandler(Processor<E> processor) throws Exception {
Processor<E> deadLetter = getDeadLetterFactory().createProcessor();
return new DeadLetterChannel<E>(processor, deadLetter, getRedeliveryPolicy());
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java Tue Mar 27 02:30:52 2007
@@ -32,5 +32,5 @@
/**
* Creates the error handler interceptor
*/
- Processor<E> createErrorHandler(Processor<E> processor);
+ Processor<E> createErrorHandler(Processor<E> processor) throws Exception;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FilterBuilder.java Tue Mar 27 02:30:52 2007
@@ -52,7 +52,7 @@
return predicate;
}
- public FilterProcessor<E> createProcessor() {
+ public FilterProcessor<E> createProcessor() throws Exception {
// lets create a single processor for all child predicates
Processor<E> childProcessor = super.createProcessor();
return new FilterProcessor<E>(predicate, childProcessor);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Tue Mar 27 02:30:52 2007
@@ -248,7 +248,7 @@
processors.add(processor);
}
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
List<Processor<E>> answer = new ArrayList<Processor<E>>();
for (ProcessorFactory<E> processFactory : processFactories) {
@@ -272,7 +272,7 @@
/**
* Creates the processor and wraps it in any necessary interceptors and error handlers
*/
- protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) {
+ protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) throws Exception {
Processor<E> processor = processFactory.createProcessor();
return getErrorHandlerBuilder().createErrorHandler(processor);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java Tue Mar 27 02:30:52 2007
@@ -49,7 +49,7 @@
return target;
}
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
// The target is required.
if( target == null )
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java Tue Mar 27 02:30:52 2007
@@ -38,7 +38,7 @@
}
@Override
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
return new MulticastProcessor<E>(endpoints);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java Tue Mar 27 02:30:52 2007
@@ -38,7 +38,7 @@
}
@Override
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
return new Pipeline<E>(endpoints);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ProcessorFactory.java Tue Mar 27 02:30:52 2007
@@ -26,6 +26,6 @@
*/
public interface ProcessorFactory<E extends Exchange> {
- public Processor<E> createProcessor();
+ public Processor<E> createProcessor() throws Exception;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Tue Mar 27 02:30:52 2007
@@ -99,29 +99,29 @@
/**
* Returns the routing map from inbound endpoints to processors
*/
- public List<Route<E>> getRouteList() {
+ public List<Route<E>> getRouteList() throws Exception {
checkInitialized();
return routes;
}
/**
- * Returns the destinationBuilders which have been created
+ * Returns the builders which have been created
*/
- public List<FromBuilder<E>> getDestinationBuilders() {
+ public List<FromBuilder<E>> getFromBuilders() throws Exception {
checkInitialized();
return fromBuilders;
}
// Implementation methods
//-----------------------------------------------------------------------
- protected void checkInitialized() {
+ protected void checkInitialized() throws Exception {
if (initalized.compareAndSet(false, true)) {
configure();
populateRoutes(routes);
}
}
- protected void populateRoutes(List<Route<E>> routes) {
+ protected void populateRoutes(List<Route<E>> routes) throws Exception {
for (FromBuilder<E> builder : fromBuilders) {
Endpoint<E> from = builder.getFrom();
Processor<E> processor = makeProcessor(from, builder);
@@ -140,7 +140,7 @@
* @param builder the builder which is the factory of the processor
* @return
*/
- protected Processor<E> makeProcessor(Endpoint<E> from, FromBuilder<E> builder) {
+ protected Processor<E> makeProcessor(Endpoint<E> from, FromBuilder<E> builder) throws Exception {
return builder.createProcessor();
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java Tue Mar 27 02:30:52 2007
@@ -36,7 +36,7 @@
this.valueBuilder = valueBuilder;
}
- public Processor<E> createProcessor() {
+ public Processor<E> createProcessor() throws Exception {
// lets create a single processor for all child predicates
Processor<E> destination = super.createProcessor();
Expression<E> expression = valueBuilder.getExpression();
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pojo;
+
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * @version $Revision$
+ */
+public class PojoConsumer extends DefaultConsumer<PojoExchange> {
+ private final Object pojo;
+
+ public PojoConsumer(Endpoint<PojoExchange> endpoint, Processor<PojoExchange> processor, Object pojo) {
+ super(endpoint, processor);
+ this.pojo = pojo;
+ }
+
+
+ /**
+ * Creates a Proxy object that can be used to deliver inbound PojoExchanges.
+ *
+ * @param interfaces
+ * @return
+ */
+ public Object createInboundProxy(Class interfaces[]) {
+ return Proxy.newProxyInstance(pojo.getClass().getClassLoader(), interfaces, new InvocationHandler() {
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if (!isStarted()) {
+ throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
+ }
+ PojoInvocation invocation = new PojoInvocation(proxy, method, args);
+ PojoExchange exchange = getEndpoint().createExchange();
+ exchange.setInvocation(invocation);
+ getProcessor().onExchange(exchange);
+ Throwable fault = exchange.getException();
+ if (fault != null) {
+ throw new InvocationTargetException(fault);
+ }
+ return exchange.getOut().getBody();
+ }
+ });
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java Tue Mar 27 02:30:52 2007
@@ -17,7 +17,11 @@
package org.apache.camel.component.pojo;
import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -40,10 +44,22 @@
this.pojo = pojo;
}
+ public Producer<PojoExchange> createProducer() throws Exception {
+ return startService(new DefaultProducer<PojoExchange>(this) {
+ public void onExchange(PojoExchange exchange) {
+ invoke(exchange);
+ }
+ });
+ }
+
+ public Consumer<PojoExchange> createConsumer(Processor<PojoExchange> processor) throws Exception {
+ return startService(new PojoConsumer(this, processor, pojo));
+ }
+
/**
* This causes us to invoke the endpoint Pojo using reflection.
*/
- public void onExchange(PojoExchange exchange) {
+ public void invoke(PojoExchange exchange) {
PojoInvocation invocation = exchange.getInvocation();
try {
Object response = invocation.getMethod().invoke(pojo, invocation.getArgs());
@@ -74,28 +90,4 @@
component.unregisterActivation(getEndpointUri());
}
- /**
- * Creates a Proxy object that can be used to deliver inbound PojoExchanges.
- *
- * @param interfaces
- * @return
- */
- public Object createInboundProxy(Class interfaces[]) {
- return Proxy.newProxyInstance(pojo.getClass().getClassLoader(), interfaces, new InvocationHandler() {
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- if (!activated.get()) {
- throw new IllegalStateException("The endpoint is not active: " + getEndpointUri());
- }
- PojoInvocation invocation = new PojoInvocation(proxy, method, args);
- PojoExchange exchange = createExchange();
- exchange.setInvocation(invocation);
- getInboundProcessor().onExchange(exchange);
- Throwable fault = exchange.getException();
- if (fault != null) {
- throw new InvocationTargetException(fault);
- }
- return exchange.getOut().getBody();
- }
- });
- }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Tue Mar 27 02:30:52 2007
@@ -19,13 +19,13 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Consumer;
+import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultProducer;
-import java.util.Queue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Represents a queue endpoint that uses a {@link BlockingQueue}
@@ -36,20 +36,22 @@
*/
public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
private BlockingQueue<E> queue;
- private org.apache.camel.component.queue.QueueEndpoint.Activation activation;
public QueueEndpoint(String uri, CamelContext container, BlockingQueue<E> queue) {
super(uri, container);
this.queue = queue;
}
- public void onExchange(E exchange) {
- queue.add(exchange);
+ public Producer<E> createProducer() throws Exception {
+ return startService(new DefaultProducer<E>(this) {
+ public void onExchange(E exchange) {
+ queue.add(exchange);
+ }
+ });
}
- public void setInboundProcessor(Processor<E> processor) {
- // TODO lets start a thread to process inbound requests
- // if we don't already have one
+ public Consumer<E> createConsumer(Processor<E> processor) throws Exception {
+ return startService(new QueueEndpointConsumer<E>(this, processor));
}
public E createExchange() {
@@ -61,59 +63,5 @@
public BlockingQueue<E> getQueue() {
return queue;
}
-
- class Activation implements Runnable {
- AtomicBoolean stop = new AtomicBoolean();
- private Thread thread;
-
- public void run() {
- while(!stop.get()) {
- E exchange=null;
- try {
- exchange = queue.poll(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- break;
- }
- if( exchange !=null ) {
- try {
- getInboundProcessor().onExchange(exchange);
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public void start() {
- thread = new Thread(this, getEndpointUri());
- thread.setDaemon(true);
- thread.start();
- }
-
- public void stop() throws InterruptedException {
- stop.set(true);
- thread.join();
- }
-
- @Override
- public String toString() {
- return "Activation: "+getEndpointUri();
- }
- }
- @Override
- protected void doActivate() {
- activation = new Activation();
- activation.start();
- }
-
- @Override
- protected void doDeactivate() {
- try {
- activation.stop();
- activation=null;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.queue;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Revision$
+ */
+public class QueueEndpointConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable {
+ private QueueEndpoint<E> endpoint;
+ private Processor<E> processor;
+ private Thread thread;
+
+ public QueueEndpointConsumer(QueueEndpoint<E> endpoint, Processor<E> processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueEndpointConsumer: " + endpoint.getEndpointUri();
+ }
+
+ public void run() {
+ while (!isStopping()) {
+ E exchange;
+ try {
+ exchange = endpoint.getQueue().poll(100, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ break;
+ }
+ if (exchange != null) {
+ try {
+ processor.onExchange(exchange);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ protected void doStart() throws Exception {
+ thread = new Thread(this, endpoint.getEndpointUri());
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ protected void doStop() throws Exception {
+ thread.join();
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Mar 27 02:30:52 2007
@@ -18,7 +18,10 @@
package org.apache.camel.impl;
import org.apache.camel.*;
+import org.apache.camel.util.ServiceHelper;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Represents the context used to configure routes and the policies to use.
@@ -34,14 +38,17 @@
* @version $Revision: 520517 $
* @org.apache.xbean.XBean element="container" rootElement="true"
*/
-public class DefaultCamelContext implements CamelContext {
+public class DefaultCamelContext implements CamelContext, Service {
+ private static final transient Log log = LogFactory.getLog(DefaultCamelContext.class);
private Map<String, Endpoint> endpoints = new HashMap<String, Endpoint>();
private Map<String, Component> components = new HashMap<String, Component>();
private List<EndpointResolver> resolvers = new CopyOnWriteArrayList<EndpointResolver>();
private List<Route> routes;
+ private List<Service> servicesToClose = new ArrayList<Service>();
private TypeConverter typeConverter;
private EndpointResolver endpointResolver;
private ExchangeConverter exchangeConverter;
+ private AtomicBoolean started = new AtomicBoolean(false);
/**
* Adds a component to the container.
@@ -107,6 +114,16 @@
// Endpoint Management Methods
//-----------------------------------------------------------------------
+ public void start() throws Exception {
+ activateEndpoints();
+ }
+
+ public void stop() throws Exception {
+ deactivateEndpoints();
+ }
+
+ // Endpoint Management Methods
+ //-----------------------------------------------------------------------
public Collection<Endpoint> getEndpoints() {
synchronized (endpoints) {
@@ -161,17 +178,26 @@
*/
public void activateEndpoints() throws Exception {
for (Route<Exchange> route : routes) {
- route.getEndpoint().activate(route.getProcessor());
+ Processor<Exchange> processor = route.getProcessor();
+ Consumer<Exchange> consumer = route.getEndpoint().createConsumer(processor);
+ if (consumer != null) {
+ consumer.start();
+ servicesToClose.add(consumer);
+ }
+ if (processor instanceof Service) {
+ Service service = (Service) processor;
+ service.start();
+ servicesToClose.add(service);
+ }
}
}
/**
* Deactivates all the starting endpoints in that were added as routes.
*/
- public void deactivateEndpoints() {
- for (Route<Exchange> route : routes) {
- route.getEndpoint().deactivate();
- }
+ public void deactivateEndpoints() throws Exception {
+ ServiceHelper.stopServices(servicesToClose);
+
}
// Route Management Methods
@@ -184,21 +210,6 @@
this.routes = routes;
}
- public void setRoutes(RouteBuilder builder) {
- // lets now add the routes from the builder
- builder.setContext(this);
- setRoutes(builder.getRouteList());
- }
-
- public void setRoutes(final RouteFactory factory) {
- RouteBuilder builder = new RouteBuilder(this) {
- public void configure() {
- factory.build(this);
- }
- };
- setRoutes(builder);
- }
-
public void addRoutes(List<Route> routes) {
if (this.routes == null) {
this.routes = new ArrayList<Route>(routes);
@@ -208,13 +219,13 @@
}
}
- public void addRoutes(RouteBuilder builder) {
+ public void addRoutes(RouteBuilder builder) throws Exception {
// lets now add the routes from the builder
builder.setContext(this);
addRoutes(builder.getRouteList());
}
- public void addRoutes(final RouteFactory factory) {
+ public void addRoutes(final RouteFactory factory) throws Exception {
RouteBuilder builder = new RouteBuilder(this) {
public void configure() {
factory.build(this);
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E> {
+ private Endpoint<E> endpoint;
+ private Processor<E> processor;
+
+ public DefaultConsumer(Endpoint<E> endpoint, Processor<E> processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ public Endpoint<E> getEndpoint() {
+ return endpoint;
+ }
+
+ public Processor<E> getProcessor() {
+ return processor;
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(processor);
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(processor);
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Tue Mar 27 02:30:52 2007
@@ -20,6 +20,9 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Service;
import org.apache.camel.util.ObjectHelper;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,5 +128,10 @@
* Called at most once by the container to deactivate the endpoint
*/
protected void doDeactivate() {
+ }
+
+ protected <T extends Service> T startService(T service) throws Exception {
+ service.start();
+ return service;
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java Tue Mar 27 02:30:52 2007
@@ -20,6 +20,7 @@
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointResolver;
+import org.apache.camel.Exchange;
import org.apache.camel.util.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
@@ -34,7 +35,7 @@
*
* @version $Revision$
*/
-public class DefaultEndpointResolver<E> implements EndpointResolver<E> {
+public class DefaultEndpointResolver<E extends Exchange> implements EndpointResolver<E> {
static final private FactoryFinder endpointResolverFactory = new FactoryFinder("META-INF/services/org/apache/camel/EndpointResolver/");
public Endpoint<E> resolveEndpoint(CamelContext container, String uri) throws Exception {
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+
+/**
+ * A default implementation of @{link Producer} for implementation inheritence
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultProducer<E extends Exchange> extends ServiceSupport implements Producer<E> {
+ private Endpoint<E> endpoint;
+
+ public DefaultProducer(Endpoint<E> endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public Endpoint<E> getEndpoint() {
+ return endpoint;
+ }
+
+ public E createExchange() {
+ return endpoint.createExchange();
+ }
+
+ public E createExchange(E exchange) {
+ return endpoint.createExchange(exchange);
+ }
+
+ protected void doStart() throws Exception {
+ }
+
+ protected void doStop() throws Exception {
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A useful base class which ensures that a service is only initialized once and provides some helper methods for
+ * enquiring of its status
+ *
+ * @version $Revision$
+ */
+public abstract class ServiceSupport implements Service {
+ private AtomicBoolean started = new AtomicBoolean(false);
+ private AtomicBoolean stopping = new AtomicBoolean(false);
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ doStart();
+ }
+ }
+
+ public void stop() throws Exception {
+ if (stopped.compareAndSet(false, true)) {
+ stopping.set(true);
+ try {
+ doStop();
+ }
+ finally {
+ stopped.set(true);
+ started.set(false);
+ stopping.set(false);
+ }
+ }
+ }
+
+ /**
+ * @return true if this service has been started
+ */
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ /**
+ * @return true if this service is in the process of closing
+ */
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
+
+ /**
+ * @return true if this service is closed
+ */
+ public boolean isStopped() {
+ return stopped.get();
+ }
+
+ protected abstract void doStart() throws Exception;
+
+ protected abstract void doStop() throws Exception;
+
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,6 +18,8 @@
package org.apache.camel.processor;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
import java.util.Collection;
@@ -26,7 +28,7 @@
*
* @version $Revision$
*/
-public class CompositeProcessor<E> implements Processor<E> {
+public class CompositeProcessor<E> extends ServiceSupport implements Processor<E> {
private final Collection<Processor<E>> processors;
public CompositeProcessor(Collection<Processor<E>> processors) {
@@ -58,5 +60,13 @@
public Collection<Processor<E>> getProcessors() {
return processors;
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(processors);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(processors);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Mar 27 02:30:52 2007
@@ -19,6 +19,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,9 +31,8 @@
*
* @version $Revision$
*/
-public class DeadLetterChannel<E extends Exchange> implements ErrorHandler<E> {
+public class DeadLetterChannel<E extends Exchange> extends ServiceSupport implements ErrorHandler<E> {
public static final String REDELIVERY_COUNT_HEADER = "org.apache.camel.redeliveryCount";
-
private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
private Processor<E> output;
private Processor<E> deadLetter;
@@ -79,7 +80,6 @@
deadLetter.onExchange(exchange);
}
-
// Properties
//-------------------------------------------------------------------------
@@ -116,7 +116,7 @@
* Sets the message header name to be used to append the redelivery count value when a message has been redelivered
*
* @param redeliveryCountHeader the header name to use to append the redelivery count or null if you wish to disable
- * this feature
+ * this feature
*/
public void setRedeliveryCountHeader(String redeliveryCountHeader) {
this.redeliveryCountHeader = redeliveryCountHeader;
@@ -133,9 +133,9 @@
protected void sleep(long redeliveryDelay) {
if (redeliveryDelay > 0) {
- if (log.isDebugEnabled()) {
- log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
+ }
try {
Thread.sleep(redeliveryDelay);
}
@@ -145,5 +145,13 @@
}
}
}
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(output, deadLetter);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(deadLetter, output);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java Tue Mar 27 02:30:52 2007
@@ -19,11 +19,13 @@
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
/**
* @version $Revision$
*/
-public class FilterProcessor<E> implements Processor<E> {
+public class FilterProcessor<E> extends ServiceSupport implements Processor<E> {
private Predicate<E> predicate;
private Processor<E> processor;
@@ -49,5 +51,13 @@
public Processor<E> getProcessor() {
return processor;
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(processor);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(processor);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,21 +18,22 @@
package org.apache.camel.processor;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
/**
* @version $Revision: 519941 $
*/
-public class InterceptorProcessor<E> implements Processor<E> {
-
+public class InterceptorProcessor<E> extends ServiceSupport implements Processor<E> {
protected Processor<E> next;
- public InterceptorProcessor() {
+ public InterceptorProcessor() {
}
public void onExchange(E exchange) {
- if( next != null ) {
- next.onExchange(exchange);
- }
+ if (next != null) {
+ next.onExchange(exchange);
+ }
}
@Override
@@ -40,10 +41,19 @@
return "intercept(" + next + ")";
}
- public Processor<E> getNext() {
- return next;
- }
- public void setNext(Processor<E> next) {
- this.next = next;
- }
+ public Processor<E> getNext() {
+ return next;
+ }
+
+ public void setNext(Processor<E> next) {
+ this.next = next;
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(next);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(next);
+ }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java Tue Mar 27 02:30:52 2007
@@ -19,6 +19,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,7 +29,7 @@
*
* @version $Revision$
*/
-public class LoggingErrorHandler<E extends Exchange> implements ErrorHandler<E> {
+public class LoggingErrorHandler<E extends Exchange> extends ServiceSupport implements ErrorHandler<E> {
private Processor<E> output;
private Log log;
private LoggingLevel level;
@@ -123,5 +125,13 @@
protected Object logMessage(E exchange, RuntimeException e) {
return e + " while processing exchange: " + exchange;
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(output);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(output);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingLevel.java Tue Mar 27 02:30:52 2007
@@ -21,7 +21,7 @@
* Used to configure the logging levels
*
* @version $Revision$
-*/
+ */
public enum LoggingLevel {
DEBUG, ERROR, FATAL, INFO, TRACE, WARN;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Mar 27 02:30:52 2007
@@ -20,7 +20,10 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ServiceSupport;
+import java.util.ArrayList;
import java.util.Collection;
/**
@@ -29,40 +32,74 @@
*
* @version $Revision$
*/
-public class MulticastProcessor<E extends Exchange> implements Processor<E> {
- private Collection<Endpoint<E>> endpoints;
+public class MulticastProcessor<E extends Exchange> extends ServiceSupport implements Processor<E> {
+ private Collection<Producer<E>> producers;
- public MulticastProcessor(Collection<Endpoint<E>> endpoints) {
- this.endpoints = endpoints;
+ /**
+ * A helper method to convert a list of endpoints into a list of processors
+ */
+ public static <E extends Exchange> Collection<Producer<E>> toProducers(Collection<Endpoint<E>> endpoints) throws Exception {
+ Collection<Producer<E>> answer = new ArrayList<Producer<E>>();
+ for (Endpoint<E> endpoint : endpoints) {
+ answer.add(endpoint.createProducer());
+ }
+ return answer;
+ }
+
+ public MulticastProcessor(Collection<Endpoint<E>> endpoints) throws Exception {
+ this.producers = toProducers(endpoints);
}
@Override
public String toString() {
- return "Multicast" + endpoints;
+ return "Multicast" + getEndpoints();
}
public void onExchange(E exchange) {
- for (Endpoint<E> endpoint : endpoints) {
- E copy = copyExchangeStrategy(endpoint, exchange);
- endpoint.onExchange(copy);
+ for (Producer<E> producer : producers) {
+ E copy = copyExchangeStrategy(producer, exchange);
+ producer.onExchange(copy);
+ }
+ }
+
+ protected void doStop() throws Exception {
+ for (Producer<E> producer : producers) {
+ producer.stop();
+ }
+ }
+
+ protected void doStart() throws Exception {
+ for (Producer<E> producer : producers) {
+ producer.start();
}
}
/**
- * Returns the endpoints to multicast to
+ * Returns the producers to multicast to
+ */
+ public Collection<Producer<E>> getProducers() {
+ return producers;
+ }
+
+ /**
+ * Returns the list of endpoints
*/
public Collection<Endpoint<E>> getEndpoints() {
- return endpoints;
+ Collection<Endpoint<E>> answer = new ArrayList<Endpoint<E>>();
+ for (Producer<E> producer : producers) {
+ answer.add(producer.getEndpoint());
+ }
+ return answer;
}
/**
* Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
* {@link Pipeline} will not clone the exchange
*
- * @param endpoint the endpoint that the exchange will be sent to
+ * @param producer the producer that will send the exchange
* @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
*/
- protected E copyExchangeStrategy(Endpoint<E> endpoint, E exchange) {
- return endpoint.createExchange(exchange);
+ protected E copyExchangeStrategy(Producer<E> producer, E exchange) {
+ return producer.createExchange(exchange);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue Mar 27 02:30:52 2007
@@ -20,45 +20,44 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
import java.util.Collection;
/**
* Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working
* with request/response message exchanges.
- *
+ *
* @version $Revision$
*/
-public class Pipeline<E extends Exchange> implements Processor<E> {
- private Collection<Endpoint<E>> endpoints;
-
- public Pipeline(Collection<Endpoint<E>> endpoints) {
- this.endpoints = endpoints;
+public class Pipeline<E extends Exchange> extends MulticastProcessor<E> implements Processor<E> {
+ public Pipeline(Collection<Endpoint<E>> endpoints) throws Exception {
+ super(endpoints);
}
public void onExchange(E exchange) {
E nextExchange = exchange;
boolean first = true;
- for (Endpoint<E> endpoint : endpoints) {
+ for (Producer<E> producer : getProducers()) {
if (first) {
first = false;
}
else {
- nextExchange = createNextExchange(endpoint, nextExchange);
+ nextExchange = createNextExchange(producer, nextExchange);
}
- endpoint.onExchange(nextExchange);
+ producer.onExchange(nextExchange);
}
}
/**
* Strategy method to create the next exchange from the
*
- * @param endpoint the endpoint the exchange will be sent to
+ * @param producer the producer used to send to the endpoint
* @param previousExchange the previous exchange
* @return a new exchange
*/
- protected E createNextExchange(Endpoint<E> endpoint, E previousExchange) {
- E answer = endpoint.createExchange(previousExchange);
+ protected E createNextExchange(Producer<E> producer, E previousExchange) {
+ E answer = producer.createExchange(previousExchange);
// now lets set the input of the next exchange to the output of the previous message if it is not null
Object output = previousExchange.getOut().getBody();
@@ -81,6 +80,6 @@
@Override
public String toString() {
- return "Pipeline" + endpoints;
+ return "Pipeline" + getEndpoints();
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue Mar 27 02:30:52 2007
@@ -21,9 +21,11 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.util.ObjectHelper.notNull;
+import org.apache.camel.util.ProducerCache;
import java.util.Iterator;
@@ -33,8 +35,9 @@
*
* @version $Revision$
*/
-public class RecipientList<E extends Exchange> implements Processor<E> {
+public class RecipientList<E extends Exchange> extends ServiceSupport implements Processor<E> {
private final Expression<E> expression;
+ private ProducerCache<E> producerCache = new ProducerCache<E>();
public RecipientList(Expression<E> expression) {
notNull(expression, "expression");
@@ -52,11 +55,18 @@
while (iter.hasNext()) {
Object recipient = iter.next();
Endpoint<E> endpoint = resolveEndpoint(exchange, recipient);
- endpoint.onExchange(exchange);
+ producerCache.getProducer(endpoint).onExchange(exchange);
}
}
protected Endpoint<E> resolveEndpoint(E exchange, Object recipient) {
return ExchangeHelper.resolveEndpoint(exchange, recipient);
+ }
+
+ protected void doStop() throws Exception {
+ producerCache.stop();
+ }
+
+ protected void doStart() throws Exception {
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Tue Mar 27 02:30:52 2007
@@ -30,7 +30,6 @@
*/
public class RedeliveryPolicy implements Cloneable, Serializable {
protected static transient Random randomNumberGenerator;
-
protected int maximumRedeliveries = 6;
protected long initialRedeliveryDelay = 1000L;
protected double backOffMultiplier = 2;
@@ -39,7 +38,6 @@
protected double collisionAvoidanceFactor = 0.15d;
protected boolean useCollisionAvoidance = false;
-
public RedeliveryPolicy() {
}
@@ -116,7 +114,6 @@
setCollisionAvoidancePercent(collisionAvoidancePercent);
return this;
}
-
// Properties
//-------------------------------------------------------------------------
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Tue Mar 27 02:30:52 2007
@@ -18,20 +18,43 @@
package org.apache.camel.processor;
import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Service;
+import org.apache.camel.impl.ServiceSupport;
/**
* @version $Revision$
*/
-public class SendProcessor<E> implements Processor<E> {
+public class SendProcessor<E extends Exchange> extends ServiceSupport implements Processor<E>, Service {
private Endpoint<E> destination;
+ private Producer<E> producer;
public SendProcessor(Endpoint<E> destination) {
this.destination = destination;
}
+ protected void doStop() throws Exception {
+ if (producer != null) {
+ try {
+ producer.stop();
+ }
+ finally {
+ producer = null;
+ }
+ }
+ }
+
+ protected void doStart() throws Exception {
+ this.producer = destination.createProducer();
+ }
+
public void onExchange(E exchange) {
- destination.onExchange(exchange);
+ if (producer == null) {
+ throw new IllegalStateException("No producer, this processor has not been started!");
+ }
+ producer.onExchange(exchange);
}
public Endpoint<E> getDestination() {