You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/24 15:49:43 UTC
[2/2] camel git commit: CAMEL-9887: Fixed so using shareUnitOfWork
would now also call specialized AggregationStrategy for onTimeone,
onCompletion etc.
CAMEL-9887: Fixed so using shareUnitOfWork would now also call specialized AggregationStrategy for onTimeone, onCompletion etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b47ca4b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b47ca4b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b47ca4b
Branch: refs/heads/camel-2.17.x
Commit: 1b47ca4b2db39384ee28154c0a4e88357ce5397b
Parents: fc95695
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 24 15:48:47 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 24 15:49:31 2016 +0200
----------------------------------------------------------------------
.../camel/processor/MulticastProcessor.java | 13 +++-
.../processor/aggregate/AggregateProcessor.java | 42 ++++++++----
.../aggregate/DelegateAggregationStrategy.java | 28 ++++++++
.../ShareUnitOfWorkAggregationStrategy.java | 8 ++-
...itterShareUnitOfWorkCompletionAwareTest.java | 69 ++++++++++++++++++++
5 files changed, 144 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index fc7da80..f30e900 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -49,6 +49,7 @@ import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.camel.processor.aggregate.DelegateAggregationStrategy;
import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteContext;
@@ -78,7 +79,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* Implements the Multicast pattern to send a message exchange to a number of
* endpoints, each endpoint receiving a copy of the message exchange.
*
- * @version
+ * @version
* @see Pipeline
*/
public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
@@ -563,6 +564,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
@Override
public void run() {
AggregationStrategy strategy = getAggregationStrategy(null);
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
if (strategy instanceof TimeoutAwareAggregationStrategy) {
// notify the strategy we timed out
Exchange oldExchange = result.get();
@@ -639,7 +643,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
} else {
doAggregate(getAggregationStrategy(subExchange), result, subExchange);
}
-
+
total.incrementAndGet();
}
@@ -861,6 +865,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
}
AggregationStrategy strategy = getAggregationStrategy(subExchange);
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
// invoke the on completion callback
if (strategy instanceof CompletionAwareAggregationStrategy) {
((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange);
@@ -970,7 +977,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
// because streams can only be read once
StreamCache copiedStreamCache = streamCache.copy(copy);
if (copiedStreamCache != null) {
- copy.getIn().setBody(copiedStreamCache);
+ copy.getIn().setBody(copiedStreamCache);
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 822e831..d094249 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -523,10 +523,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
- if (aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) {
+ AggregationStrategy strategy = aggregationStrategy;
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
+ if (strategy instanceof OptimisticLockingAwareAggregationStrategy) {
LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
- new Object[]{aggregationStrategy, oldExchange, newExchange});
- ((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange, newExchange);
+ new Object[]{strategy, oldExchange, newExchange});
+ ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange);
}
}
@@ -540,8 +544,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
*/
protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
boolean complete = false;
- if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
- complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+ AggregationStrategy strategy = aggregationStrategy;
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
+ if (strategy instanceof PreCompletionAwareAggregationStrategy) {
+ complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
}
return complete ? "strategy" : null;
}
@@ -630,8 +638,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
- if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
- return ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+ AggregationStrategy strategy = aggregationStrategy;
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
+ if (strategy instanceof PreCompletionAwareAggregationStrategy) {
+ return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
}
return false;
}
@@ -664,9 +676,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
if (fromTimeout) {
// invoke timeout if its timeout aware aggregation strategy,
// to allow any custom processing before discarding the exchange
- if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+ AggregationStrategy strategy = aggregationStrategy;
+ if (strategy instanceof DelegateAggregationStrategy) {
+ strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+ }
+ if (strategy instanceof TimeoutAwareAggregationStrategy) {
long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
- ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(aggregated, -1, -1, timeout);
+ ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout);
}
}
@@ -695,8 +711,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
inProgressCompleteExchanges.add(exchange.getExchangeId());
// invoke the on completion callback
- if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
- ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
+ AggregationStrategy target = aggregationStrategy;
+ if (target instanceof DelegateAggregationStrategy) {
+ target = ((DelegateAggregationStrategy) target).getDelegate();
+ }
+ if (target instanceof CompletionAwareAggregationStrategy) {
+ ((CompletionAwareAggregationStrategy) target).onCompletion(exchange);
}
if (getStatistics().isStatisticsEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
new file mode 100644
index 0000000..0f92038
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * Interface to be used for {@link AggregationStrategy} that delegate to the real {@link AggregationStrategy}.
+ */
+public interface DelegateAggregationStrategy {
+
+ /**
+ * Gets the delegated {@link AggregationStrategy}
+ */
+ AggregationStrategy getDelegate();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index 4a1187f..dc912b1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -28,7 +28,7 @@ import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErro
* <p/>
* This strategy is <b>not</b> intended for end users to use.
*/
-public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy {
+public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy, DelegateAggregationStrategy {
private final AggregationStrategy strategy;
@@ -36,6 +36,10 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
this.strategy = strategy;
}
+ public AggregationStrategy getDelegate() {
+ return strategy;
+ }
+
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// aggreagate using the actual strategy first
Exchange answer = strategy.aggregate(oldExchange, newExchange);
@@ -44,7 +48,7 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
return answer;
}
-
+
protected void propagateFailure(Exchange answer, Exchange newExchange) {
// if new exchange failed then propagate all the error related properties to the answer
boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
new file mode 100644
index 0000000..a52827d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+
+/**
+ * @version
+ */
+public class SplitterShareUnitOfWorkCompletionAwareTest extends ContextTestSupport {
+
+ public void testCompletionAware() throws Exception {
+ getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C");
+ getMockEndpoint("mock:result").expectedHeaderReceived("foo", "bar");
+
+ template.sendBody("direct:start", "A,B,C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .split(body(), new MyStrategy()).shareUnitOfWork()
+ .to("mock:line")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MyStrategy implements CompletionAwareAggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body = oldExchange.getIn().getBody() + "+" + newExchange.getIn().getBody();
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ @Override
+ public void onCompletion(Exchange exchange) {
+ exchange.getIn().setHeader("foo", "bar");
+ }
+ }
+
+}
\ No newline at end of file