You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/21 13:33:36 UTC
svn commit: r520860 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/builder/
main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/util/ test/java/org/apache/...
Author: jstrachan
Date: Wed Mar 21 05:33:32 2007
New Revision: 520860
URL: http://svn.apache.org/viewvc?view=rev&rev=520860
Log:
added support for DeadLetterChannel, Multicast and Pipeline patterns along with adding an error handler to the RouteBuilder so folks can configure the dead letter policy and so forth
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Wed Mar 21 05:33:32 2007
@@ -40,6 +40,12 @@
E createExchange();
/**
+ * Creates a new exchange for communicating with this exchange using the given exchange to pre-populate the values
+ * of the headers and messages
+ */
+ E createExchange(E exchange);
+
+ /**
* Called by the container to Activate the endpoint. Once activated,
* the endpoint will start delivering inbound message exchanges
* that are received to the specified processor.
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Mar 21 05:33:32 2007
@@ -83,4 +83,11 @@
* destination
*/
Exchange copy();
+
+ /**
+ * Copies the data into this exchange from the given exchange
+ *
+ * #param source is the source from which headers and messages will be copied
+ */
+ void copyFrom(Exchange source);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Wed Mar 21 05:33:32 2007
@@ -18,6 +18,7 @@
import org.apache.camel.Expression;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
/**
* Base class for implementation inheritance
@@ -26,6 +27,14 @@
*/
public abstract class BuilderSupport<E extends Exchange> {
+ private ErrorHandlerBuilder<E> errorHandlerBuilder;
+
+ protected BuilderSupport() {
+ }
+
+ // Builder methods
+ //-------------------------------------------------------------------------
+
/**
* Returns a predicate and value builder for headers on an exchange
*/
@@ -66,5 +75,28 @@
return new ValueBuilder<E>(expression);
}
+
+ // Properties
+ //-------------------------------------------------------------------------
+
+ protected BuilderSupport(BuilderSupport<E> parent) {
+ if (parent.errorHandlerBuilder != null) {
+ this.errorHandlerBuilder = parent.errorHandlerBuilder.copy();
+ }
+ }
+
+ public ErrorHandlerBuilder<E> getErrorHandlerBuilder() {
+ if (errorHandlerBuilder == null) {
+ errorHandlerBuilder = new DeadLetterChannelBuilder<E>();
+ }
+ return errorHandlerBuilder;
+ }
+
+ /**
+ * Sets the error handler to use with processors created by this builder
+ */
+ public void setErrorHandlerBuilder(ErrorHandlerBuilder<E> errorHandlerBuilder) {
+ this.errorHandlerBuilder = errorHandlerBuilder;
+ }
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Expression;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.RecipientList;
+
+/**
+ * A builder of a <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter Channel</a>
+ *
+ * @version $Revision$
+ */
+public class DeadLetterChannelBuilder<E extends Exchange> extends BuilderSupport<E> implements ErrorHandlerBuilder<E> {
+ private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private ProcessorFactory<E> deadLetterFactory;
+ private Processor<E> defaultDeadLetterEndpoint;
+ private Expression<E> defaultDeadLetterEndpointExpression;
+ private String defaultDeadLetterEndpointUri = "log:org.apache.camel.DeadLetterChannel:error";
+
+ public DeadLetterChannelBuilder() {
+ }
+
+ public DeadLetterChannelBuilder(ProcessorFactory<E> deadLetterFactory) {
+ this.deadLetterFactory = deadLetterFactory;
+ }
+
+ public ErrorHandlerBuilder<E> copy() {
+ DeadLetterChannelBuilder<E> answer = new DeadLetterChannelBuilder<E>(deadLetterFactory);
+ answer.setRedeliveryPolicy(getRedeliveryPolicy().copy());
+ return answer;
+ }
+
+ public Processor<E> createErrorHandler(Processor<E> processor) {
+ Processor<E> deadLetter = getDeadLetterFactory().createProcessor();
+ return new DeadLetterChannel<E>(processor, deadLetter, getRedeliveryPolicy());
+ }
+
+ public RedeliveryPolicy getRedeliveryPolicy() {
+ return redeliveryPolicy;
+ }
+
+ /**
+ * Sets the redelivery policy
+ */
+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+ this.redeliveryPolicy = redeliveryPolicy;
+ }
+
+ public ProcessorFactory<E> getDeadLetterFactory() {
+ if (deadLetterFactory == null) {
+ deadLetterFactory = new ProcessorFactory<E>() {
+ public Processor<E> createProcessor() {
+ return getDefaultDeadLetterEndpoint();
+ }
+ };
+ }
+ return deadLetterFactory;
+ }
+
+ /**
+ * Sets the default dead letter queue factory
+ */
+ public void setDeadLetterFactory(ProcessorFactory<E> deadLetterFactory) {
+ this.deadLetterFactory = deadLetterFactory;
+ }
+
+ public Processor<E> getDefaultDeadLetterEndpoint() {
+ if (defaultDeadLetterEndpoint == null) {
+ defaultDeadLetterEndpoint = new RecipientList<E>(getDefaultDeadLetterEndpointExpression());
+ }
+ return defaultDeadLetterEndpoint;
+ }
+
+ /**
+ * Sets the default dead letter endpoint used
+ */
+ public void setDefaultDeadLetterEndpoint(Processor<E> defaultDeadLetterEndpoint) {
+ this.defaultDeadLetterEndpoint = defaultDeadLetterEndpoint;
+ }
+
+ public Expression<E> getDefaultDeadLetterEndpointExpression() {
+ if (defaultDeadLetterEndpointExpression == null) {
+ defaultDeadLetterEndpointExpression = ExpressionBuilder.constantExpression(getDefaultDeadLetterEndpointUri());
+ }
+ return defaultDeadLetterEndpointExpression;
+ }
+
+ /**
+ * Sets the expression used to decide the dead letter channel endpoint for an exchange
+ * if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)}
+ */
+ public void setDefaultDeadLetterEndpointExpression(Expression<E> defaultDeadLetterEndpointExpression) {
+ this.defaultDeadLetterEndpointExpression = defaultDeadLetterEndpointExpression;
+ }
+
+ public String getDefaultDeadLetterEndpointUri() {
+ return defaultDeadLetterEndpointUri;
+ }
+
+ /**
+ * Sets the default dead letter endpoint URI used if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)}
+ * and no expression is provided via {@link #setDefaultDeadLetterEndpointExpression(Expression)}
+ *
+ * @param defaultDeadLetterEndpointUri the default URI if no deadletter factory or expression is provided
+ */
+ public void setDefaultDeadLetterEndpointUri(String defaultDeadLetterEndpointUri) {
+ this.defaultDeadLetterEndpointUri = defaultDeadLetterEndpointUri;
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * @version $Revision$
+ */
+public interface ErrorHandlerBuilder<E extends Exchange> {
+ /**
+ * Creates a copy of this builder
+ */
+ ErrorHandlerBuilder<E> copy();
+
+ /**
+ * Creates the error handler interceptor
+ */
+ Processor<E> createErrorHandler(Processor<E> processor);
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java Wed Mar 21 05:33:32 2007
@@ -81,7 +81,7 @@
@Override
public String toString() {
- return "Body";
+ return "body";
}
};
}
@@ -98,7 +98,7 @@
@Override
public String toString() {
- return "BodyAs[" + type.getName() + "]";
+ return "bodyAs[" + type.getName() + "]";
}
};
}
@@ -114,7 +114,7 @@
@Override
public String toString() {
- return "OutBody";
+ return "outBody";
}
};
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Mar 21 05:33:32 2007
@@ -20,11 +20,14 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.processor.InterceptorProcessor;
+import org.apache.camel.processor.MulticastProcessor;
+import org.apache.camel.processor.Pipeline;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import java.util.ArrayList;
import java.util.List;
+import java.util.Collection;
/**
* @version $Revision$
@@ -52,6 +55,23 @@
return getBuilder().endpoint(uri);
}
+ public List<Endpoint<E>> endpoints(String... uris) {
+ List<Endpoint<E>> endpoints = new ArrayList<Endpoint<E>>();
+ for (String uri : uris) {
+ endpoints.add(endpoint(uri));
+ }
+ return endpoints;
+ }
+
+ public List<Endpoint<E>> endpoints(Endpoint<E>... uris) {
+ List<Endpoint<E>> endpoints = new ArrayList<Endpoint<E>>();
+ for (Endpoint<E> uri : uris) {
+ endpoints.add(uri);
+ }
+ return endpoints;
+ }
+
+
/**
* Sends the exchange to the given endpoint URI
*/
@@ -70,29 +90,52 @@
/**
- * Sends the exchange to the given endpoint URI
+ * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
*/
public ProcessorFactory<E> to(String... uris) {
- ProcessorFactory<E> answer = null;
- for (String uri : uris) {
- answer = to(endpoint(uri));
- }
- return answer;
+ return to(endpoints(uris));
}
/**
- * Sends the exchange to the given endpoint
+ * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
*/
public ProcessorFactory<E> to(Endpoint<E>... endpoints) {
- ProcessorFactory<E> answer = null;
- for (Endpoint<E> endpoint : endpoints) {
- answer = to(endpoint);
- }
- return answer;
+ return to(endpoints(endpoints));
}
/**
+ * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern
+ */
+ public ProcessorFactory<E> to(Collection<Endpoint<E>> endpoints) {
+ return addProcessBuilder(new MulticastBuilder<E>(this, endpoints));
+ }
+
+ /**
+ * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+ * and for request/response the output of one endpoint will be the input of the next endpoint
+ */
+ public ProcessorFactory<E> pipeline(String... uris) {
+ return pipeline(endpoints(uris));
+ }
+
+ /**
+ * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+ * and for request/response the output of one endpoint will be the input of the next endpoint
+ */
+ public ProcessorFactory<E> pipeline(Endpoint<E>... endpoints) {
+ return pipeline(endpoints(endpoints));
+ }
+
+ /**
+ * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+ * and for request/response the output of one endpoint will be the input of the next endpoint
+ */
+ public ProcessorFactory<E> pipeline(Collection<Endpoint<E>> endpoints) {
+ return addProcessBuilder(new PipelineBuilder<E>(this, endpoints));
+ }
+
+ /**
* Adds the custom processor to this destination
*/
public ConstantProcessorBuilder<E> process(Processor<E> processor) {
@@ -161,8 +204,9 @@
return from;
}
- public void addProcessBuilder(ProcessorFactory<E> processFactory) {
+ public ProcessorFactory<E> addProcessBuilder(ProcessorFactory<E> processFactory) {
processFactories.add(processFactory);
+ return processFactory;
}
public void addProcessor(Processor<E> processor) {
@@ -173,7 +217,7 @@
List<Processor<E>> answer = new ArrayList<Processor<E>>();
for (ProcessorFactory<E> processFactory : processFactories) {
- Processor<E> processor = processFactory.createProcessor();
+ Processor<E> processor = makeProcessor(processFactory);
if (processor == null) {
throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory);
}
@@ -188,6 +232,14 @@
else {
return new CompositeProcessor<E>(answer);
}
+ }
+
+ /**
+ * Creates the processor and wraps it in any necessary interceptors and error handlers
+ */
+ protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) {
+ Processor<E> processor = processFactory.createProcessor();
+ return getErrorHandlerBuilder().createErrorHandler(processor);
}
public List<Processor<E>> getProcessors() {
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.MulticastProcessor;
+
+import java.util.Collection;
+
+/**
+ * A builder for the {@link MulticastProcessor} pattern
+ *
+ * @version $Revision$
+ */
+public class MulticastBuilder<E extends Exchange> extends FromBuilder<E> {
+ private final Collection<Endpoint<E>> endpoints;
+
+ public MulticastBuilder(FromBuilder<E> parent, Collection<Endpoint<E>> endpoints) {
+ super(parent);
+ this.endpoints = endpoints;
+ }
+
+ @Override
+ public Processor<E> createProcessor() {
+ return new MulticastProcessor<E>(endpoints);
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.processor.MulticastProcessor;
+import org.apache.camel.processor.Pipeline;
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * A builder for the {@link Pipeline} pattern
+ *
+ * @version $Revision$
+ */
+public class PipelineBuilder<E extends Exchange> extends FromBuilder<E> {
+ private final Collection<Endpoint<E>> endpoints;
+
+ public PipelineBuilder(FromBuilder<E> parent, Collection<Endpoint<E>> endpoints) {
+ super(parent);
+ this.endpoints = endpoints;
+ }
+
+ @Override
+ public Processor<E> createProcessor() {
+ return new Pipeline<E>(endpoints);
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java Wed Mar 21 05:33:32 2007
@@ -38,6 +38,11 @@
public boolean evaluate(E exchange) {
return left.evaluate(exchange) && right.evaluate(exchange);
}
+
+ @Override
+ public String toString() {
+ return "(" + left + ") and (" + right + ")";
+ }
};
}
@@ -51,10 +56,14 @@
public boolean evaluate(E exchange) {
return left.evaluate(exchange) || right.evaluate(exchange);
}
+
+ @Override
+ public String toString() {
+ return "(" + left + ") or (" + right + ")";
+ }
};
}
-
public static <E extends Exchange> Predicate<E> isEqualTo(final Expression<E> left, final Expression<E> right) {
notNull(left, "left");
notNull(right, "right");
@@ -65,6 +74,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.equals(value1, value2);
}
+
+ @Override
+ public String toString() {
+ return left + " == " + right;
+ }
};
}
@@ -78,6 +92,11 @@
Object value2 = right.evaluate(exchange);
return !ObjectHelper.equals(value1, value2);
}
+
+ @Override
+ public String toString() {
+ return left + " != " + right;
+ }
};
}
@@ -91,6 +110,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.compare(value1, value2) < 0;
}
+
+ @Override
+ public String toString() {
+ return left + " < " + right;
+ }
};
}
@@ -104,6 +128,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.compare(value1, value2) <= 0;
}
+
+ @Override
+ public String toString() {
+ return left + " <= " + right;
+ }
};
}
@@ -117,6 +146,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.compare(value1, value2) > 0;
}
+
+ @Override
+ public String toString() {
+ return left + " > " + right;
+ }
};
}
@@ -130,6 +164,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.compare(value1, value2) >= 0;
}
+
+ @Override
+ public String toString() {
+ return left + " >= " + right;
+ }
};
}
@@ -142,6 +181,11 @@
Object value = expression.evaluate(exchange);
return type.isInstance(value);
}
+
+ @Override
+ public String toString() {
+ return expression + " instanceof " + type.getName();
+ }
};
}
@@ -153,6 +197,11 @@
Object value = expression.evaluate(exchange);
return value == null;
}
+
+ @Override
+ public String toString() {
+ return expression + " == null";
+ }
};
}
@@ -164,6 +213,11 @@
Object value = expression.evaluate(exchange);
return value != null;
}
+
+ @Override
+ public String toString() {
+ return expression + " != null";
+ }
};
}
@@ -177,8 +231,11 @@
Object value2 = right.evaluate(exchange);
return ObjectHelper.contains(value1, value2);
}
+
+ @Override
+ public String toString() {
+ return left + ".contains(" + right + ")";
+ }
};
}
-
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Wed Mar 21 05:33:32 2007
@@ -25,9 +25,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
/**
+ * A default endpoint useful for implementation inheritence
+ *
* @version $Revision$
*/
-public abstract class DefaultEndpoint<E> implements Endpoint<E> {
+public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
private String endpointUri;
private CamelContext context;
private Processor<E> inboundProcessor;
@@ -92,6 +94,13 @@
activated.set(false);
doDeactivate();
}
+ }
+
+
+ public E createExchange(E exchange) {
+ E answer = createExchange();
+ answer.copyFrom(exchange);
+ return answer;
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Wed Mar 21 05:33:32 2007
@@ -41,17 +41,18 @@
public Exchange copy() {
Exchange exchange = newInstance();
- if (exchange instanceof DefaultExchange) {
- DefaultExchange defaultExchange = (DefaultExchange) exchange;
- defaultExchange.setHeaders(getHeaders().copy());
- defaultExchange.setIn(getIn().copy());
- defaultExchange.setOut(getOut().copy());
- defaultExchange.setFault(getFault().copy());
- defaultExchange.setException(getException());
- }
+ exchange.copyFrom(this);
return exchange;
}
+ public void copyFrom(Exchange exchange) {
+ setHeaders(exchange.getHeaders().copy());
+ setIn(exchange.getIn().copy());
+ setOut(exchange.getOut().copy());
+ setFault(exchange.getFault().copy());
+ setException(exchange.getException());
+ }
+
public Exchange newInstance() {
return new DefaultExchange(context);
}
@@ -124,5 +125,4 @@
protected Message createOutMessage() {
return new DefaultMessage();
}
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Mar 21 05:33:32 2007
@@ -48,6 +48,11 @@
this.redeliveryPolicy = redeliveryPolicy;
}
+ @Override
+ public String toString() {
+ return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
+ }
+
public void onExchange(E exchange) {
int redeliveryCounter = 0;
long redeliveryDelay = 0;
@@ -77,6 +82,21 @@
// Properties
//-------------------------------------------------------------------------
+
+ /**
+ * Returns the output processor
+ */
+ public Processor<E> getOutput() {
+ return output;
+ }
+
+ /**
+ * Returns the dead letter that message exchanges will be sent to if the redelivery attempts fail
+ */
+ public Processor<E> getDeadLetter() {
+ return deadLetter;
+ }
+
public RedeliveryPolicy getRedeliveryPolicy() {
return redeliveryPolicy;
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * Implements the Multicast pattern to send a message exchange to a number of endpoints, each endpoint receiving a copy of
+ * the message exchange.
+ *
+ * @version $Revision$
+ */
+public class MulticastProcessor<E extends Exchange> implements Processor<E> {
+ private Collection<Endpoint<E>> endpoints;
+
+ public MulticastProcessor(Collection<Endpoint<E>> endpoints) {
+ this.endpoints = endpoints;
+ }
+
+ @Override
+ public String toString() {
+ return "Multicast" + endpoints;
+ }
+
+ public void onExchange(E exchange) {
+ for (Endpoint<E> endpoint : endpoints) {
+ E copy = copyExchangeStrategy(endpoint, exchange);
+ endpoint.onExchange(copy);
+ }
+ }
+
+ /**
+ * Returns the endpoints to multicast to
+ */
+ public Collection<Endpoint<E>> getEndpoints() {
+ return endpoints;
+ }
+
+ /**
+ * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
+ * {@link Pipeline} will not clone the exchange
+ *
+ * @param endpoint the endpoint that the exchange will be sent to
+ * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
+ */
+ protected E copyExchangeStrategy(Endpoint<E> endpoint, E exchange) {
+ return endpoint.createExchange(exchange);
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working
+ * with request/response message exchanges.
+ *
+ * @version $Revision$
+ */
+public class Pipeline<E extends Exchange> implements Processor<E> {
+ private Collection<Endpoint<E>> endpoints;
+
+ public Pipeline(Collection<Endpoint<E>> endpoints) {
+ this.endpoints = endpoints;
+ }
+
+ public void onExchange(E exchange) {
+ E nextExchange = exchange;
+ boolean first = true;
+ for (Endpoint<E> endpoint : endpoints) {
+ if (first) {
+ first = false;
+ }
+ else {
+ nextExchange = createNextExchange(endpoint, nextExchange);
+ }
+ endpoint.onExchange(nextExchange);
+ }
+ }
+
+ /**
+ * Strategy method to create the next exchange from the
+ *
+ * @param endpoint the endpoint the exchange will be sent to
+ * @param previousExchange the previous exchange
+ * @return a new exchange
+ */
+ protected E createNextExchange(Endpoint<E> endpoint, E previousExchange) {
+ E answer = endpoint.createExchange(previousExchange);
+
+ // now lets set the input of the next exchange to the output of the previous message if it is not null
+ Object output = previousExchange.getOut().getBody();
+ if (output != null) {
+ answer.getIn().setBody(output);
+ }
+ return answer;
+ }
+
+ /**
+ * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
+ * {@link Pipeline} will not clone the exchange
+ *
+ * @param exchange
+ * @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
+ */
+ protected E copyExchangeStrategy(E exchange) {
+ return (E) exchange.copy();
+ }
+
+ @Override
+ public String toString() {
+ return "Pipeline" + endpoints;
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Mar 21 05:33:32 2007
@@ -18,6 +18,7 @@
package org.apache.camel.processor;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ExchangeHelper;
import static org.apache.camel.util.ObjectHelper.notNull;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -56,19 +57,7 @@
}
}
- @SuppressWarnings({"unchecked"})
protected Endpoint<E> resolveEndpoint(E exchange, Object recipient) {
- Endpoint<E> endpoint;
- if (recipient instanceof Endpoint) {
- endpoint = (Endpoint<E>) recipient;
- }
- else {
- String uri = recipient.toString();
- endpoint = (Endpoint<E>) exchange.getContext().resolveEndpoint(uri);
- if (endpoint == null) {
- throw new NoSuchEndpointException(uri);
- }
- }
- return endpoint;
+ return ExchangeHelper.resolveEndpoint(exchange, recipient);
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Wed Mar 21 05:33:32 2007
@@ -40,6 +40,11 @@
public RedeliveryPolicy() {
}
+ @Override
+ public String toString() {
+ return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries + "]";
+ }
+
public RedeliveryPolicy copy() {
try {
return (RedeliveryPolicy) clone();
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Wed Mar 21 05:33:32 2007
@@ -17,8 +17,9 @@
*/
package org.apache.camel.util;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
+import org.apache.camel.NoSuchEndpointException;
/**
* Some helper methods for working with {@link Exchange} objects
@@ -26,4 +27,30 @@
* @version $Revision$
*/
public class ExchangeHelper {
+
+ /**
+ * Attempts to resolve the endpoint for the given value
+ *
+ * @param exchange the message exchange being processed
+ * @param value the value which can be an {@link Endpoint} or an object which provides a String representation
+ * of an endpoint via {@link #toString()}
+ *
+ * @return the endpoint
+ * @throws NoSuchEndpointException if the endpoint cannot be resolved
+ */
+ @SuppressWarnings({"unchecked"})
+ public static <E extends Exchange> Endpoint<E> resolveEndpoint(E exchange, Object value) throws NoSuchEndpointException {
+ Endpoint<E> endpoint;
+ if (value instanceof Endpoint) {
+ endpoint = (Endpoint<E>) value;
+ }
+ else {
+ String uri = value.toString();
+ endpoint = (Endpoint<E>) exchange.getContext().resolveEndpoint(uri);
+ if (endpoint == null) {
+ throw new NoSuchEndpointException(uri);
+ }
+ }
+ return endpoint;
+ }
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java Wed Mar 21 05:33:32 2007
@@ -19,12 +19,13 @@
import junit.framework.TestCase;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.ChoiceProcessor;
-import org.apache.camel.processor.CompositeProcessor;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.FilterProcessor;
import org.apache.camel.processor.InterceptorProcessor;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.Splitter;
+import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.MulticastProcessor;
import java.util.ArrayList;
import java.util.List;
@@ -68,7 +69,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor);
SendProcessor sendProcessor = (SendProcessor) processor;
@@ -76,7 +77,7 @@
}
}
- protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
+ protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
// START SNIPPET: e2
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
@@ -98,17 +99,18 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor);
FilterProcessor filterProcessor = (FilterProcessor) processor;
- SendProcessor sendProcessor = (SendProcessor) filterProcessor.getProcessor();
+ SendProcessor sendProcessor = (SendProcessor) unwrapErrorHandler(filterProcessor.getProcessor());
assertEquals("Endpoint URI", "queue:b", sendProcessor.getDestination().getEndpointUri());
}
}
- protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
+
+ protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
// START SNIPPET: e3
RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
public void configure() {
@@ -133,7 +135,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a ChoiceProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof ChoiceProcessor);
ChoiceProcessor<Exchange> choiceProcessor = (ChoiceProcessor<Exchange>) processor;
@@ -178,7 +180,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertEquals("Should be called with my processor", myProcessor, processor);
}
@@ -207,11 +209,11 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor);
FilterProcessor filterProcessor = (FilterProcessor) processor;
- assertEquals("Should be called with my processor", myProcessor, filterProcessor.getProcessor());
+ assertEquals("Should be called with my processor", myProcessor, unwrapErrorHandler(filterProcessor.getProcessor()));
}
}
@@ -238,18 +240,19 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
+
+ assertTrue("Processor should be a MulticastProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof MulticastProcessor);
+ MulticastProcessor<Exchange> multicastProcessor = (MulticastProcessor<Exchange>) processor;
- assertTrue("Processor should be a CompositeProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof CompositeProcessor);
- CompositeProcessor<Exchange> compositeProcessor = (CompositeProcessor<Exchange>) processor;
- List<Processor<Exchange>> processors = new ArrayList<Processor<Exchange>>(compositeProcessor.getProcessors());
- assertEquals("Should have 2 processors", 2, processors.size());
+ List<Endpoint<Exchange>> endpoints = new ArrayList<Endpoint<Exchange>>(multicastProcessor.getEndpoints());
+ assertEquals("Should have 2 endpoints", 2, endpoints.size());
- assertSendTo(processors.get(0), "queue:tap");
- assertSendTo(processors.get(1), "queue:b");
+ assertEndpointUri(endpoints.get(0), "queue:tap");
+ assertEndpointUri(endpoints.get(1), "queue:b");
}
}
-
+
protected RouteBuilder<Exchange> buildRouteWithInterceptor() {
interceptor1 = new InterceptorProcessor<Exchange>() {
};
@@ -288,7 +291,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a interceptor1 but was: " + processor + " with type: " + processor.getClass().getName(), processor==interceptor1);
InterceptorProcessor<Exchange> p1 = (InterceptorProcessor<Exchange>) processor;
@@ -311,7 +314,6 @@
};
// END SNIPPET: e7
-
Map<Endpoint<Exchange>, Processor<Exchange>> routeMap = builder.getRouteMap();
System.out.println("Created map: " + routeMap);
@@ -320,7 +322,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
System.out.println("processor: " + processor);
/* TODO
@@ -367,7 +369,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a RecipientList but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof RecipientList);
RecipientList<Exchange> p1 = (RecipientList<Exchange>) processor;
@@ -396,7 +398,7 @@
for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
Endpoint<Exchange> key = route.getKey();
assertEquals("From endpoint", "queue:a", key.getEndpointUri());
- Processor processor = route.getValue();
+ Processor processor = getProcessorWithoutErrorHandler(route);
assertTrue("Processor should be a Splitter but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof Splitter);
Splitter<Exchange> p1 = (Splitter<Exchange>) processor;
@@ -404,9 +406,29 @@
}
protected void assertSendTo(Processor processor, String uri) {
+ processor = unwrapErrorHandler(processor);
+
assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor);
SendProcessor sendProcessor = (SendProcessor) processor;
assertEquals("Endpoint URI", uri, sendProcessor.getDestination().getEndpointUri());
+ }
+
+ /**
+ * By default routes should be wrapped in the {@link DeadLetterChannel} so lets unwrap that and return the actual processor
+ */
+ protected Processor<Exchange> getProcessorWithoutErrorHandler(Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route) {
+ Processor<Exchange> processor = route.getValue();
+ return unwrapErrorHandler(processor);
+ }
+
+ protected Processor<Exchange> unwrapErrorHandler(Processor<Exchange> processor) {
+ assertTrue("Processor should be a DeadLetterChannel but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof DeadLetterChannel);
+ DeadLetterChannel deadLetter = (DeadLetterChannel) processor;
+ return deadLetter.getOutput();
+ }
+
+ protected void assertEndpointUri(Endpoint<Exchange> endpoint, String uri) {
+ assertEquals("Endoint uri for: " + endpoint, uri, endpoint.getEndpointUri());
}
}