You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/05/12 13:54:54 UTC
svn commit: r943458 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/RecipientListDefinition.java
main/java/org/apache/camel/processor/RecipientList.java
test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java
Author: ningjiang
Date: Wed May 12 11:54:53 2010
New Revision: 943458
URL: http://svn.apache.org/viewvc?rev=943458&view=rev
Log:
CAMEL-2710 Skip invalid endpoints without throwing an exception
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=943458&r1=943457&r2=943458&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed May 12 11:54:53 2010
@@ -57,6 +57,8 @@ public class RecipientListDefinition<Typ
private String executorServiceRef;
@XmlAttribute(required = false)
private Boolean stopOnException;
+ @XmlAttribute(required = false)
+ private Boolean ignoreInvalidEndpoints;
public RecipientListDefinition() {
}
@@ -94,6 +96,9 @@ public class RecipientListDefinition<Typ
if (stopOnException != null) {
answer.setStopOnException(isStopOnException());
}
+ if (ignoreInvalidEndpoints != null) {
+ answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints);
+ }
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "RecipientList", this);
if (isParallelProcessing() && executorService == null) {
@@ -159,6 +164,16 @@ public class RecipientListDefinition<Typ
setStrategyRef(aggregationStrategyRef);
return this;
}
+
+ /**
+ * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
+ *
+ * @return the builder
+ */
+ public RecipientListDefinition<Type> ignoreInvalidEndpoints() {
+ setIgnoreInvalidEndpoints(true);
+ return this;
+ }
/**
* Doing the recipient list work in parallel
@@ -227,6 +242,14 @@ public class RecipientListDefinition<Typ
public void setExecutorServiceRef(String executorServiceRef) {
this.executorServiceRef = executorServiceRef;
}
+
+ public Boolean isIgnoreInvalidEndpoints() {
+ return ignoreInvalidEndpoints;
+ }
+
+ public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) {
+ this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
+ }
public Boolean isStopOnException() {
return stopOnException;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=943458&r1=943457&r2=943458&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed May 12 11:54:53 2010
@@ -36,6 +36,8 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -48,12 +50,14 @@ import static org.apache.camel.util.Obje
* @version $Revision$
*/
public class RecipientList extends ServiceSupport implements Processor {
+ private static final transient Log LOG = LogFactory.getLog(RecipientList.class);
private final CamelContext camelContext;
private ProducerCache producerCache;
private Expression expression;
private final String delimiter;
private boolean parallelProcessing;
private boolean stopOnException;
+ private boolean ignoreInvalidEndpoints;
private ExecutorService executorService;
private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
@@ -110,11 +114,19 @@ public class RecipientList extends Servi
List<Processor> processors = new ArrayList<Processor>();
while (iter.hasNext()) {
Object recipient = iter.next();
- Endpoint endpoint = resolveEndpoint(exchange, recipient);
- // acquire producer which we then release later
- Producer producer = producerCache.acquireProducer(endpoint);
- processors.add(producer);
- producers.put(endpoint, producer);
+ try {
+ Endpoint endpoint = resolveEndpoint(exchange, recipient);
+ // acquire producer which we then release later
+ Producer producer = producerCache.acquireProducer(endpoint);
+ processors.add(producer);
+ producers.put(endpoint, producer);
+ } catch (Exception ex) {
+ if (isIgnoreInvalidEndpoints()) {
+ LOG.warn("Get a invalid endpoint with " + recipient , ex);
+ } else {
+ throw ex;
+ }
+ }
}
MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors, getAggregationStrategy(),
@@ -150,6 +162,14 @@ public class RecipientList extends Servi
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache);
}
+
+ public boolean isIgnoreInvalidEndpoints() {
+ return ignoreInvalidEndpoints;
+ }
+
+ public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
+ this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
+ }
public boolean isParallelProcessing() {
return parallelProcessing;
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java?rev=943458&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java Wed May 12 11:54:53 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static org.apache.camel.language.simple.SimpleLanguage.simple;
+
+public class RecipientListIgnoreInvalidEndpointsTest extends ContextTestSupport {
+
+ public void testRecipientListWithIgnoreInvalidEndpointsOption() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello World");
+
+ MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+ endpointA.expectedBodiesReceived("Hello a");
+
+ template.requestBody("direct:startA", "Hello World", String.class);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testRecipientListWithoutIgnoreInvalidEndpointsOption() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(0);
+
+ MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+ endpointA.expectedMessageCount(0);
+
+ try {
+ template.requestBody("direct:startB", "Hello World", String.class);
+ fail("Expect the exception here.");
+ } catch (Exception ex) {
+ assertTrue("Get a wrong cause of the exception", ex.getCause() instanceof ResolveEndpointFailedException);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:startA").recipientList(simple("mock:result,fail:endpoint,direct:a")).ignoreInvalidEndpoints();
+
+ from("direct:startB").recipientList(simple("mock:result,fail:endpoint,direct:a"));
+
+ from("direct:a").transform(constant("Hello a")).to("mock:endpointA");
+
+ }
+ };
+ }
+
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListIgnoreInvalidEndpointsTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date