You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/11 09:38:48 UTC
svn commit: r783663 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/aggregate/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Thu Jun 11 07:38:47 2009
New Revision: 783663
URL: http://svn.apache.org/viewvc?rev=783663&view=rev
Log:
CAMEL-1691: Filter EIP now marks exchanges as filtered that split, multicast, enrich now uses to skip filtered exchanges when doing aggregation.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=783663&r1=783662&r2=783663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Jun 11 07:38:47 2009
@@ -59,6 +59,7 @@
String FILE_NAME_PRODUCED = "CamelFileNameProduced";
String FILE_PATH = "CamelFilePath";
String FILE_PARENT = "CamelFileParent";
+ String FILTERED = "CamelFiltered";
String HTTP_CHARACTER_ENCODING = "CamelHttpCharacterEncoding";
String HTTP_METHOD = "CamelHttpMethod";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=783663&r1=783662&r2=783663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Thu Jun 11 07:38:47 2009
@@ -23,6 +23,8 @@
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
@@ -35,8 +37,8 @@
*/
public class Enricher extends ServiceSupport implements Processor {
+ private static final transient Log LOG = LogFactory.getLog(Enricher.class);
private AggregationStrategy aggregationStrategy;
-
private Producer producer;
/**
@@ -99,10 +101,19 @@
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
+
// aggregate original exchange and resource exchange
- Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
- // copy aggregation result onto original exchange (preserving pattern)
- copyResultsPreservePattern(exchange, aggregatedExchange);
+ // but do not aggregate if the resource exchange was filtered
+ Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (filtered == null || !filtered) {
+ Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+ // copy aggregation result onto original exchange (preserving pattern)
+ copyResultsPreservePattern(exchange, aggregatedExchange);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange);
+ }
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java?rev=783663&r1=783662&r2=783663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java Thu Jun 11 07:38:47 2009
@@ -19,6 +19,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* The processor which implements the
@@ -27,6 +29,7 @@
* @version $Revision$
*/
public class FilterProcessor extends DelegateProcessor {
+ private static final Log LOG = LogFactory.getLog(FilterProcessor.class);
private final Predicate predicate;
public FilterProcessor(Predicate predicate, Processor processor) {
@@ -37,6 +40,12 @@
public void process(Exchange exchange) throws Exception {
if (predicate.matches(exchange)) {
super.process(exchange);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Marking exchange as filtered: " + exchange);
+ }
+ // mark this exchange as filtered
+ exchange.setProperty(Exchange.FILTERED, Boolean.TRUE);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=783663&r1=783662&r2=783663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Thu Jun 11 07:38:47 2009
@@ -92,6 +92,7 @@
public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
notNull(processors, "processors");
+ // TODO: end() does not work correctly with Splitter
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.isParallelProcessing = parallelProcessing;
@@ -204,8 +205,14 @@
* @param exchange the exchange to be added to the result
*/
protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
- if (aggregationStrategy != null) {
+ // only aggregate if the exchange is not filtered (eg by the FilterProcessor)
+ Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (aggregationStrategy != null && (filtered == null || !filtered)) {
result.set(aggregationStrategy.aggregate(result.get(), exchange));
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?rev=783663&r1=783662&r2=783663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java Thu Jun 11 07:38:47 2009
@@ -57,11 +57,22 @@
@Override
public boolean add(Exchange exchange) {
+ // do not add exchange if it was filtered
+ Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
+ if (filtered != null && filtered) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
+ }
+ return false;
+ }
+
Object correlationKey = correlationExpression.evaluate(exchange, Object.class);
if (LOG.isTraceEnabled()) {
- LOG.trace("Evaluated expression: " + correlationExpression + " as CorrelationKey: " + correlationKey);
+ LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey);
}
+ // TODO: correlationKey evalutated to null should be skipped by default
+
Exchange oldExchange = aggregated.get(correlationKey);
Exchange newExchange = exchange;
@@ -80,7 +91,7 @@
// the strategy may just update the old exchange and return it
if (!newExchange.equals(oldExchange)) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Put exchange:" + newExchange + " with coorelation key:" + correlationKey);
+ LOG.trace("Put exchange:" + newExchange + " with correlation key:" + correlationKey);
}
aggregated.put(correlationKey, newExchange);
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java?rev=783663&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java Thu Jun 11 07:38:47 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Enrich aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class EnrichShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testEnrichWithFilterNotFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testEnrichWithFilterFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hi there");
+
+ template.sendBody("direct:start", "Hi there");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .enrich("direct:enrich", new MyAggregationStrategy())
+ .to("mock:result");
+
+ Predicate goodWord = body().contains("World");
+ from("direct:enrich")
+ .filter(goodWord)
+ .to("mock:filtered");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+ assertTrue("Should have been filtered: " + newBody, newBody.contains("World"));
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EnrichShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java?rev=783663&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java Thu Jun 11 07:38:47 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Multicast aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class MulticastShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testMulticastWithFilterNotFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMulticastWithFilterFiltered() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hi there");
+
+ template.sendBody("direct:start", "Hi there");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("direct:multicast")
+ .to("mock:result");
+
+ from("direct:multicast")
+ .multicast(new MyAggregationStrategy())
+ .to("direct:a")
+ .to("direct:b");
+
+ Predicate goodWord = body().contains("World");
+ from("direct:a", "direct:b")
+ .filter(goodWord)
+ .to("mock:filtered");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+ assertTrue("Should have been filtered: " + newBody, newBody.contains("World"));
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java?rev=783663&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java Thu Jun 11 07:38:47 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test to verify that Splitter aggregator does not included filtered exchanges.
+ *
+ * @version $Revision$
+ */
+public class SplitShouldSkipFilteredExchanges extends ContextTestSupport {
+
+ public void testSplitWithFilter() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World,Bye World");
+
+ MockEndpoint filtered = getMockEndpoint("mock:filtered");
+ filtered.expectedBodiesReceived("Hello World", "Bye World");
+
+ List<String> body = new ArrayList<String>();
+ body.add("Hello World");
+ body.add("Hi there");
+ body.add("Bye World");
+ body.add("How do you do?");
+
+ template.sendBody("direct:start", body);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("direct:split")
+ .to("mock:result");
+
+ Predicate goodWord = body().contains("World");
+ from("direct:split")
+ .split(body(List.class), new MyAggregationStrategy())
+ .filter(goodWord)
+ .to("mock:filtered");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String newBody = newExchange.getIn().getBody(String.class);
+ assertTrue("Should have been filtered: " + newBody, newBody.contains("World"));
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ body = body + "," + newBody;
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitShouldSkipFilteredExchanges.java
------------------------------------------------------------------------------
svn:keywords = Rev Date