You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/24 15:49:43 UTC

[2/2] camel git commit: CAMEL-9887: Fixed so using shareUnitOfWork would now also call specialized AggregationStrategy for onTimeone, onCompletion etc.

CAMEL-9887: Fixed so using shareUnitOfWork would now also call specialized AggregationStrategy for onTimeone, onCompletion etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b47ca4b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b47ca4b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b47ca4b

Branch: refs/heads/camel-2.17.x
Commit: 1b47ca4b2db39384ee28154c0a4e88357ce5397b
Parents: fc95695
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 24 15:48:47 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 24 15:49:31 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/MulticastProcessor.java     | 13 +++-
 .../processor/aggregate/AggregateProcessor.java | 42 ++++++++----
 .../aggregate/DelegateAggregationStrategy.java  | 28 ++++++++
 .../ShareUnitOfWorkAggregationStrategy.java     |  8 ++-
 ...itterShareUnitOfWorkCompletionAwareTest.java | 69 ++++++++++++++++++++
 5 files changed, 144 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index fc7da80..f30e900 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -49,6 +49,7 @@ import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.camel.processor.aggregate.DelegateAggregationStrategy;
 import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteContext;
@@ -78,7 +79,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  * Implements the Multicast pattern to send a message exchange to a number of
  * endpoints, each endpoint receiving a copy of the message exchange.
  *
- * @version 
+ * @version
  * @see Pipeline
  */
 public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
@@ -563,6 +564,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         @Override
         public void run() {
             AggregationStrategy strategy = getAggregationStrategy(null);
+            if (strategy instanceof DelegateAggregationStrategy) {
+                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+            }
             if (strategy instanceof TimeoutAwareAggregationStrategy) {
                 // notify the strategy we timed out
                 Exchange oldExchange = result.get();
@@ -639,7 +643,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
             } else {
                 doAggregate(getAggregationStrategy(subExchange), result, subExchange);
             }
-            
+
             total.incrementAndGet();
         }
 
@@ -861,6 +865,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         }
 
         AggregationStrategy strategy = getAggregationStrategy(subExchange);
+        if (strategy instanceof DelegateAggregationStrategy) {
+            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+        }
         // invoke the on completion callback
         if (strategy instanceof CompletionAwareAggregationStrategy) {
             ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange);
@@ -970,7 +977,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                     // because streams can only be read once
                     StreamCache copiedStreamCache = streamCache.copy(copy);
                     if (copiedStreamCache != null) {
-                        copy.getIn().setBody(copiedStreamCache);  
+                        copy.getIn().setBody(copiedStreamCache);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 822e831..d094249 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -523,10 +523,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
-        if (aggregationStrategy instanceof OptimisticLockingAwareAggregationStrategy) {
+        AggregationStrategy strategy = aggregationStrategy;
+        if (strategy instanceof DelegateAggregationStrategy) {
+            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+        }
+        if (strategy instanceof OptimisticLockingAwareAggregationStrategy) {
             LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
-                      new Object[]{aggregationStrategy, oldExchange, newExchange});
-            ((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange, newExchange);
+                      new Object[]{strategy, oldExchange, newExchange});
+            ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange);
         }
     }
 
@@ -540,8 +544,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
      */
     protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
         boolean complete = false;
-        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
-            complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+        AggregationStrategy strategy = aggregationStrategy;
+        if (strategy instanceof DelegateAggregationStrategy) {
+            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+        }
+        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
+            complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
         }
         return complete ? "strategy" : null;
     }
@@ -630,8 +638,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
-        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
-            return ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange);
+        AggregationStrategy strategy = aggregationStrategy;
+        if (strategy instanceof DelegateAggregationStrategy) {
+            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+        }
+        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
+            return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
         }
         return false;
     }
@@ -664,9 +676,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         if (fromTimeout) {
             // invoke timeout if its timeout aware aggregation strategy,
             // to allow any custom processing before discarding the exchange
-            if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+            AggregationStrategy strategy = aggregationStrategy;
+            if (strategy instanceof DelegateAggregationStrategy) {
+                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
+            }
+            if (strategy instanceof TimeoutAwareAggregationStrategy) {
                 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
-                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(aggregated, -1, -1, timeout);
+                ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout);
             }
         }
 
@@ -695,8 +711,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         inProgressCompleteExchanges.add(exchange.getExchangeId());
 
         // invoke the on completion callback
-        if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
-            ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
+        AggregationStrategy target = aggregationStrategy;
+        if (target instanceof DelegateAggregationStrategy) {
+            target = ((DelegateAggregationStrategy) target).getDelegate();
+        }
+        if (target instanceof CompletionAwareAggregationStrategy) {
+            ((CompletionAwareAggregationStrategy) target).onCompletion(exchange);
         }
 
         if (getStatistics().isStatisticsEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
new file mode 100644
index 0000000..0f92038
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DelegateAggregationStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * Interface to be used for {@link AggregationStrategy} that delegate to the real {@link AggregationStrategy}.
+ */
+public interface DelegateAggregationStrategy {
+
+    /**
+     * Gets the delegated {@link AggregationStrategy}
+     */
+    AggregationStrategy getDelegate();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index 4a1187f..dc912b1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -28,7 +28,7 @@ import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErro
  * <p/>
  * This strategy is <b>not</b> intended for end users to use.
  */
-public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy {
+public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy, DelegateAggregationStrategy {
 
     private final AggregationStrategy strategy;
 
@@ -36,6 +36,10 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
         this.strategy = strategy;
     }
 
+    public AggregationStrategy getDelegate() {
+        return strategy;
+    }
+
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         // aggreagate using the actual strategy first
         Exchange answer = strategy.aggregate(oldExchange, newExchange);
@@ -44,7 +48,7 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
 
         return answer;
     }
-    
+
     protected void propagateFailure(Exchange answer, Exchange newExchange) {
         // if new exchange failed then propagate all the error related properties to the answer
         boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/1b47ca4b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
new file mode 100644
index 0000000..a52827d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkCompletionAwareTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+
+/**
+ * @version 
+ */
+public class SplitterShareUnitOfWorkCompletionAwareTest extends ContextTestSupport {
+
+    public void testCompletionAware() throws Exception {
+        getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C");
+        getMockEndpoint("mock:result").expectedHeaderReceived("foo", "bar");
+
+        template.sendBody("direct:start", "A,B,C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .split(body(), new MyStrategy()).shareUnitOfWork()
+                        .to("mock:line")
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyStrategy implements CompletionAwareAggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body = oldExchange.getIn().getBody() + "+" + newExchange.getIn().getBody();
+            oldExchange.getIn().setBody(body);
+            return oldExchange;
+        }
+
+        @Override
+        public void onCompletion(Exchange exchange) {
+            exchange.getIn().setHeader("foo", "bar");
+        }
+    }
+
+}
\ No newline at end of file