You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/04/28 07:38:36 UTC
[1/3] git commit: CAMEL-7271: GroupExchange in agg strategy should
create a new exchange instread of copy,
as then exceptions would be propagated which it shouldnt. Thanks to Luke
Hamaty for unit test reproducing the issue.
Repository: camel
Updated Branches:
refs/heads/camel-2.12.x a59809d6a -> da561c8b4
refs/heads/camel-2.13.x bcd7db1ad -> b913f14fa
refs/heads/master e801ea44f -> a5a2f750f
CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.
Conflicts:
camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b913f14f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b913f14f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b913f14f
Branch: refs/heads/camel-2.13.x
Commit: b913f14fa31fdc444f9752d4efcc1413bc7d556d
Parents: bcd7db1
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 27 15:01:16 2014 +0200
----------------------------------------------------------------------
.../GroupedExchangeAggregationStrategy.java | 10 +-
.../MulticastGroupedExchangeExceptionTest.java | 98 ++++++++++++++++++++
2 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b913f14f/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 587746c..c574f6a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggregate;
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
/**
* Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -43,13 +44,12 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- Exchange answer = super.aggregate(oldExchange, newExchange);
if (oldExchange == null) {
- // for the first time we must do a copy as the answer, so the outgoing
- // exchange is not one of the grouped exchanges, as that causes a endless circular reference
- answer = answer.copy();
+ // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+ // must not be one of the grouped exchanges, as that causes a endless circular reference
+ oldExchange = new DefaultExchange(newExchange);
}
- return answer;
+ return super.aggregate(oldExchange, newExchange);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/b913f14f/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+ public void testBothGood() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testBFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+ endpointB.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testAFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+ endpointA.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast(new GroupedExchangeAggregationStrategy())
+ .to("mock:endpointA", "mock:endpointB")
+ .end()
+ .to("mock:result");
+
+ }
+ };
+ }
+
+}
[2/3] git commit: CAMEL-7271: GroupExchange in agg strategy should
create a new exchange instread of copy,
as then exceptions would be propagated which it shouldnt. Thanks to Luke
Hamaty for unit test reproducing the issue.
Posted by da...@apache.org.
CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.
Conflicts:
camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
Conflicts:
camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/da561c8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/da561c8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/da561c8b
Branch: refs/heads/camel-2.12.x
Commit: da561c8b47b510ee2f12dbaa16b65a60c2ba70f3
Parents: a59809d
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 28 07:38:03 2014 +0200
----------------------------------------------------------------------
.../GroupedExchangeAggregationStrategy.java | 11 +++
.../MulticastGroupedExchangeExceptionTest.java | 98 ++++++++++++++++++++
2 files changed, 109 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/da561c8b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 8a748b5..c9a965d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.aggregate;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
/**
* Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -34,6 +35,16 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
}
@Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+ // must not be one of the grouped exchanges, as that causes a endless circular reference
+ oldExchange = new DefaultExchange(newExchange);
+ }
+ return super.aggregate(oldExchange, newExchange);
+ }
+
+ @Override
public Exchange getValue(Exchange exchange) {
return exchange;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/da561c8b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+ public void testBothGood() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testBFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+ endpointB.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testAFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+ endpointA.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast(new GroupedExchangeAggregationStrategy())
+ .to("mock:endpointA", "mock:endpointB")
+ .end()
+ .to("mock:result");
+
+ }
+ };
+ }
+
+}
[3/3] git commit: CAMEL-7271: GroupExchange in agg strategy should
create a new exchange instread of copy,
as then exceptions would be propagated which it shouldnt. Thanks to Luke
Hamaty for unit test reproducing the issue.
Posted by da...@apache.org.
CAMEL-7271: GroupExchange in agg strategy should create a new exchange instread of copy, as then exceptions would be propagated which it shouldnt. Thanks to Luke Hamaty for unit test reproducing the issue.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5a2f750
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5a2f750
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5a2f750
Branch: refs/heads/master
Commit: a5a2f750fecdd88ca96a1d743632372b901bbb67
Parents: e801ea4
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 27 15:00:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 28 07:38:24 2014 +0200
----------------------------------------------------------------------
.../GroupedExchangeAggregationStrategy.java | 11 +--
.../MulticastGroupedExchangeExceptionTest.java | 98 ++++++++++++++++++++
2 files changed, 103 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a5a2f750/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
index 84b375d..2906270 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggregate;
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
/**
* Aggregate all exchanges into a single combined Exchange holding all the aggregated exchanges
@@ -29,7 +30,6 @@ import org.apache.camel.Exchange;
public class GroupedExchangeAggregationStrategy extends AbstractListAggregationStrategy<Exchange> {
@Override
-
public void onCompletion(Exchange exchange) {
if (isStoreAsBodyOnCompletion()) {
// lets be backwards compatible
@@ -43,13 +43,12 @@ public class GroupedExchangeAggregationStrategy extends AbstractListAggregationS
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- Exchange answer = super.aggregate(oldExchange, newExchange);
if (oldExchange == null) {
- // for the first time we must do a copy as the answer, so the outgoing
- // exchange is not one of the grouped exchanges, as that causes a endless circular reference
- answer = answer.copy();
+ // for the first time we must create a new empty exchange as the holder, as the outgoing exchange
+ // must not be one of the grouped exchanges, as that causes a endless circular reference
+ oldExchange = new DefaultExchange(newExchange);
}
- return answer;
+ return super.aggregate(oldExchange, newExchange);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/a5a2f750/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
new file mode 100644
index 0000000..a0a242c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastGroupedExchangeExceptionTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class MulticastGroupedExchangeExceptionTest extends ContextTestSupport {
+
+ public void testBothGood() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testBFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointB = getMockEndpoint("mock:endpointB");
+ endpointB.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ public void testAFail() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+
+ MockEndpoint endpointA = getMockEndpoint("mock:endpointA");
+ endpointA.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Fake exception");
+ }
+ });
+
+ template.sendBody("direct:start", "dummy");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange received = result.getReceivedExchanges().get(0);
+ assertThat("no exception", received.isFailed(), is(false));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast(new GroupedExchangeAggregationStrategy())
+ .to("mock:endpointA", "mock:endpointB")
+ .end()
+ .to("mock:result");
+
+ }
+ };
+ }
+
+}