You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/12 17:55:00 UTC

[3/4] camel git commit: CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.

CAMEL-9444: Fix using shareUnitOfWork with multicast and using onException with unhandled=false.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/baece126
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/baece126
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/baece126

Branch: refs/heads/master
Commit: baece126edb7dd9ca9507534c522e9996e724d87
Parents: fffafeb
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 12 17:09:29 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 12 17:54:51 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/model/MulticastDefinition.java |  20 ++--
 .../apache/camel/model/ProcessorDefinition.java |  11 +-
 .../camel/model/RecipientListDefinition.java    |   9 +-
 .../org/apache/camel/model/SplitDefinition.java |  12 +++
 .../apache/camel/processor/RecipientList.java   |  10 +-
 .../org/apache/camel/processor/Splitter.java    |   6 +-
 .../ShareUnitOfWorkAggregationStrategy.java     |  77 ++++++++++++++
 ...tOfWorkOnExceptionHandledFalseIssueTest.java |   2 +-
 .../MulticastCopyOfSplitSubUnitOfWorkTest.java  | 102 +++++++++++++++++++
 .../camel/processor/SplitSubUnitOfWorkTest.java |   1 +
 10 files changed, 222 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 55f6ad0..42b3e59 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -287,11 +288,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
     }
 
     protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
-        AggregationStrategy strategy = createAggregationStrategy(routeContext);
-        if (strategy == null) {
-            // default to use latest aggregation strategy
-            strategy = new UseLatestAggregationStrategy();
-        }
+        final AggregationStrategy strategy = createAggregationStrategy(routeContext);
 
         boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
@@ -333,14 +330,23 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
             }
         }
 
-        if (strategy != null && strategy instanceof CamelContextAware) {
+        if (strategy == null) {
+            // default to use latest aggregation strategy
+            strategy = new UseLatestAggregationStrategy();
+        }
+
+        if (strategy instanceof CamelContextAware) {
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 0705d69..eacb304 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -54,7 +54,6 @@ import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
 import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
-import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -535,16 +534,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
             processor = createProcessor(routeContext);
         }
 
-        // unwrap internal processor so we can set id on the actual processor
-        Processor idProcessor = processor;
-        if (processor instanceof CamelInternalProcessor) {
-            idProcessor = ((CamelInternalProcessor) processor).getProcessor();
-        }
-
         // inject id
-        if (idProcessor instanceof IdAware) {
+        if (processor instanceof IdAware) {
             String id = this.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
-            ((IdAware) idProcessor).setId(id);
+            ((IdAware) processor).setId(id);
         }
 
         if (processor == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 49d75f9..0d02a48 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -34,6 +34,7 @@ import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -192,8 +193,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
                 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
             }
         }
+
         if (strategy == null) {
-            // fallback to use latest
+            // default to use latest aggregation strategy
             strategy = new UseLatestAggregationStrategy();
         }
 
@@ -201,6 +203,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index ccfd045..5e49de2 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -31,6 +31,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.Splitter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
@@ -119,6 +120,12 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
                             isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
                             timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
+//        if (isShareUnitOfWork) {
+            // wrap answer in a sub unit of work, since we share the unit of work
+//            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer);
+//            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
+//            return internalProcessor;
+//        }
         return answer;
     }
 
@@ -144,6 +151,11 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw
             ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
         }
 
+        if (strategy != null && shareUnitOfWork != null && shareUnitOfWork) {
+            // wrap strategy in share unit of work
+            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
+        }
+
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 98f8e45..ded8ca9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -166,16 +166,8 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
             return true;
         }
 
-        AsyncProcessor target = rlp;
-        if (isShareUnitOfWork()) {
-            // wrap answer in a sub unit of work, since we share the unit of work
-            CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp);
-            internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
-            target = internalProcessor;
-        }
-
         // now let the multicast process the exchange
-        return target.process(exchange, callback);
+        return rlp.process(exchange, callback);
     }
 
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 55a9bd9..40ca426 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -36,6 +36,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.Traceable;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
@@ -97,7 +98,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
         // and propagate exceptions which is done by a per exchange specific aggregation strategy
         // to ensure it supports async routing
         if (strategy == null) {
-            UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
+            AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
+            if (isShareUnitOfWork()) {
+                original = new ShareUnitOfWorkAggregationStrategy(original);
+            }
             setAggregationStrategyOnExchange(exchange, original);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
new file mode 100644
index 0000000..4a1187f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErrorHandler;
+
+/**
+ * An {@link AggregationStrategy} which are used when the option <tt>shareUnitOfWork</tt> is enabled
+ * on EIPs such as multicast, splitter or recipientList.
+ * <p/>
+ * This strategy wraps the actual in use strategy to provide the logic needed for making shareUnitOfWork work.
+ * <p/>
+ * This strategy is <b>not</b> intended for end users to use.
+ */
+public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy {
+
+    private final AggregationStrategy strategy;
+
+    public ShareUnitOfWorkAggregationStrategy(AggregationStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // aggreagate using the actual strategy first
+        Exchange answer = strategy.aggregate(oldExchange, newExchange);
+        // ensure any errors is propagated from the new exchange to the answer
+        propagateFailure(answer, newExchange);
+
+        return answer;
+    }
+    
+    protected void propagateFailure(Exchange answer, Exchange newExchange) {
+        // if new exchange failed then propagate all the error related properties to the answer
+        boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(newExchange);
+        if (newExchange.isFailed() || newExchange.isRollbackOnly() || exceptionHandled) {
+            if (newExchange.getException() != null) {
+                answer.setException(newExchange.getException());
+            }
+            if (newExchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) {
+                answer.setProperty(Exchange.EXCEPTION_CAUGHT, newExchange.getProperty(Exchange.EXCEPTION_CAUGHT));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_ENDPOINT) != null) {
+                answer.setProperty(Exchange.FAILURE_ENDPOINT, newExchange.getProperty(Exchange.FAILURE_ENDPOINT));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_ROUTE_ID) != null) {
+                answer.setProperty(Exchange.FAILURE_ROUTE_ID, newExchange.getProperty(Exchange.FAILURE_ROUTE_ID));
+            }
+            if (newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null) {
+                answer.setProperty(Exchange.ERRORHANDLER_HANDLED, newExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
+            }
+            if (newExchange.getProperty(Exchange.FAILURE_HANDLED) != null) {
+                answer.setProperty(Exchange.FAILURE_HANDLED, newExchange.getProperty(Exchange.FAILURE_HANDLED));
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ShareUnitOfWorkAggregationStrategy";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
index ef33c9a..55fe155 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest.java
@@ -31,7 +31,7 @@ public class RecipientListShareUnitOfWorkOnExceptionHandledFalseIssueTest extend
             template.sendBodyAndHeader("direct:start", "Hello World", "foo", "direct:b,direct:c");
             fail("Should throw exception");
         } catch (Exception e) {
-            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause());
             assertEquals("Forced", cause.getMessage());
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
new file mode 100644
index 0000000..ebf4daf
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastCopyOfSplitSubUnitOfWorkTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class MulticastCopyOfSplitSubUnitOfWorkTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOK() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:dead").expectedMessageCount(0);
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:line").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testError() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:line").expectedMessageCount(0);
+
+        template.sendBody("direct:start", "Hello Donkey");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, counter); // 1 first + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                errorHandler(deadLetterChannel("mock:dead").useOriginalMessage()
+                        .maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start")
+                    .to("mock:a")
+                    // share unit of work in the multicast, which tells Camel to propagate failures from
+                    // processing the multicast messages back to the result of the splitter, which allows
+                    // it to act as a combined unit of work
+                    .multicast().shareUnitOfWork()
+                        .to("mock:b")
+                        .to("direct:line")
+                    .end()
+                    .to("mock:result");
+
+                from("direct:line")
+                    .to("log:line")
+                    .process(new MyProcessor())
+                    .to("mock:line");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    public static class MyProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            String body = exchange.getIn().getBody(String.class);
+            if (body.contains("Donkey")) {
+                counter++;
+                throw new IllegalArgumentException("Donkey not allowed");
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/baece126/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
index 25fe6cc..0be2fea 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitSubUnitOfWorkTest.java
@@ -20,6 +20,7 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  *