You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/04/28 07:38:36 UTC

[1/3] git commit: CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.

Repository: camel
Updated Branches:
  refs/heads/camel-2.12.x a59809d6a -> da561c8b4
  refs/heads/camel-2.13.x bcd7db1ad -> b913f14fa
  refs/heads/master e801ea44f -> a5a2f750f


CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.

Conflicts:
	camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b913f14f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b913f14f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b913f14f

Branch: refs/heads/camel-2.13.x
Commit: b913f14fa31fdc444f9752d4efcc1413bc7d556d
Parents: bcd7db1
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 27 15:01:16 2014 +0200

----------------------------------------------------------------------
 .../GroupedExchangeAggregationStrategy.java     | 10 +-
 .../MulticastGroupedExchangeExceptionTest.java  | 98 ++++++++++++++++++++
 2 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b913f14f/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 587746c..c574f6a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggregate;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
 
 /**
  * Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -43,13 +44,12 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
 
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        Exchange answer = super.aggregate(oldExchange, newExchange);
         if (oldExchange == null) {
-            // for the first time we must do a copy as the answer, so the outgoing
-            // exchange is not one of the grouped exchanges, as that causes a endless circular reference
-            answer = answer.copy();
+            // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+            // must not be one of the grouped exchanges, as that causes a endless circular reference
+            oldExchange = new DefaultExchange(newExchange);
         }
-        return answer;
+        return super.aggregate(oldExchange, newExchange);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/b913f14f/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+    public void testBothGood() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testBFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+        endpointB.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testAFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+        endpointA.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new GroupedExchangeAggregationStrategy())
+                        .to("mock:endpointA", "mock:endpointB")
+                    .end()
+                    .to("mock:result");
+
+            }
+        };
+    }
+
+}


[2/3] git commit: CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.

Posted by da...@apache.org.
CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.

Conflicts:
	camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java

Conflicts:
	camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/da561c8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/da561c8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/da561c8b

Branch: refs/heads/camel-2.12.x
Commit: da561c8b47b510ee2f12dbaa16b65a60c2ba70f3
Parents: a59809d
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 28 07:38:03 2014 +0200

----------------------------------------------------------------------
 .../GroupedExchangeAggregationStrategy.java     | 11 +++
 .../MulticastGroupedExchangeExceptionTest.java  | 98 ++++++++++++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/da561c8b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 8a748b5..c9a965d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor.aggregate;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
 
 /**
  * Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -34,6 +35,16 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
     }
 
     @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+            // must not be one of the grouped exchanges, as that causes a endless circular reference
+            oldExchange = new DefaultExchange(newExchange);
+        }
+        return super.aggregate(oldExchange, newExchange);
+    }
+
+    @Override
     public Exchange getValue(Exchange exchange) {
         return exchange;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/da561c8b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+    public void testBothGood() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testBFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+        endpointB.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testAFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+        endpointA.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new GroupedExchangeAggregationStrategy())
+                        .to("mock:endpointA", "mock:endpointB")
+                    .end()
+                    .to("mock:result");
+
+            }
+        };
+    }
+
+}


[3/3] git commit: CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.

Posted by da...@apache.org.
CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5a2f750
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5a2f750
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5a2f750

Branch: refs/heads/master
Commit: a5a2f750fecdd88ca96a1d743632372b901bbb67
Parents: e801ea4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 28 07:38:24 2014 +0200

----------------------------------------------------------------------
 .../GroupedExchangeAggregationStrategy.java     | 11 +--
 .../MulticastGroupedExchangeExceptionTest.java  | 98 ++++++++++++++++++++
 2 files changed, 103 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5a2f750/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 84b375d..2906270 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggregate;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
 
 /**
  * Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -29,7 +30,6 @@ import org.apache.camel.Exchange;
 public class GroupedExchangeAggregationStrategy extends AbstractListAggregationStrategy<Exchange> {
 
     @Override
-   
     public void onCompletion(Exchange exchange) {
         if (isStoreAsBodyOnCompletion()) {
             // lets be backwards compatible
@@ -43,13 +43,12 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
 
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        Exchange answer = super.aggregate(oldExchange, newExchange);
         if (oldExchange == null) {
-            // for the first time we must do a copy as the answer, so the outgoing
-            // exchange is not one of the grouped exchanges, as that causes a endless circular reference
-            answer = answer.copy();
+            // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+            // must not be one of the grouped exchanges, as that causes a endless circular reference
+            oldExchange = new DefaultExchange(newExchange);
         }
-        return answer;
+        return super.aggregate(oldExchange, newExchange);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a5a2f750/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+    public void testBothGood() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testBFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+        endpointB.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    public void testAFail() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+
+        MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+        endpointA.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalArgumentException("Fake exception");
+            }
+        });
+
+        template.sendBody("direct:start", "dummy");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange received = result.getReceivedExchanges().get(0);
+        assertThat("no exception", received.isFailed(), is(false));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new GroupedExchangeAggregationStrategy())
+                        .to("mock:endpointA", "mock:endpointB")
+                    .end()
+                    .to("mock:result");
+
+            }
+        };
+    }
+
+}