You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/05 15:46:45 UTC
[2/2] git commit: CAMEL-7146: Fixed aggregator when completion size
is 1, eg when completed asap. Should not call remove as we did not add.
CAMEL-7146: Fixed aggregator when completion size is 1, eg when completed asap. Should not call remove as we did not add.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/090cd028
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/090cd028
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/090cd028
Branch: refs/heads/camel-2.12.x
Commit: 090cd028f7189738c6aa4dc82b781496e66b7026
Parents: eed81aa
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 5 15:39:11 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 5 15:47:35 2014 +0100
----------------------------------------------------------------------
.../processor/aggregate/AggregateProcessor.java | 8 +-
.../apache/camel/spi/AggregationRepository.java | 10 ++
.../AggregateCompletionOnlyOneTest.java | 117 ++++++++++++++++++
.../AggregateCompletionOnlyTwoTest.java | 118 +++++++++++++++++++
4 files changed, 251 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index ca39061..16950e0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -455,8 +455,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
- // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
- aggregationRepository.remove(aggregated.getContext(), key, original);
+ // only remove if we have previous added (as we could potentially complete with only 1 exchange)
+ // (if we have previous added then we have that as the original exchange)
+ if (original != null) {
+ // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
+ aggregationRepository.remove(aggregated.getContext(), key, original);
+ }
if (!fromTimeout && timeoutMap != null) {
// cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java b/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
index 669a956..ba1de6a 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
@@ -32,6 +32,9 @@ public interface AggregationRepository {
* Add the given {@link Exchange} under the correlation key.
* <p/>
* Will replace any existing exchange.
+ * <p/>
+ * <b>Important:</b> This method is <b>not</b> invoked if only one exchange was completed, and therefore
+ * the exchange does not need to be added to a repository, as its completed immediately.
*
* @param camelContext the current CamelContext
* @param key the correlation key
@@ -42,6 +45,8 @@ public interface AggregationRepository {
/**
* Gets the given exchange with the correlation key
+ * <p/>
+ * This method is always invoked for any incoming exchange in the aggregator.
*
* @param camelContext the current CamelContext
* @param key the correlation key
@@ -52,6 +57,9 @@ public interface AggregationRepository {
/**
* Removes the exchange with the given correlation key, which should happen
* when an {@link Exchange} is completed
+ * <p/>
+ * <b>Important:</b> This method is <b>not</b> invoked if only one exchange was completed, and therefore
+ * the exchange does not need to be added to a repository, as its completed immediately.
*
* @param camelContext the current CamelContext
* @param key the correlation key
@@ -61,6 +69,8 @@ public interface AggregationRepository {
/**
* Confirms the completion of the {@link Exchange}.
+ * <p/>
+ * This method is always invoked.
*
* @param camelContext the current CamelContext
* @param exchangeId exchange id to confirm
http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java
new file mode 100644
index 0000000..76e0cff
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.spi.AggregationRepository;
+
+/**
+ * @version
+ */
+public class AggregateCompletionOnlyOneTest extends ContextTestSupport {
+
+ private MyRepo repo = new MyRepo();
+
+ public void testOnlyOne() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A", "B", "C", "END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, repo.getGet());
+ // add and remove is not in use as we are completed immediately
+ assertEquals(0, repo.getAdd());
+ assertEquals(0, repo.getRemove());
+ assertEquals(4, repo.getConfirm());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).aggregationRepository(repo)
+ .completionSize(1)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private class MyRepo implements AggregationRepository {
+
+ private int add;
+ private int get;
+ private int remove;
+ private int confirm;
+
+ @Override
+ public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+ add++;
+ return null;
+ }
+
+ @Override
+ public Exchange get(CamelContext camelContext, String key) {
+ get++;
+ return null;
+ }
+
+ @Override
+ public void remove(CamelContext camelContext, String key, Exchange exchange) {
+ remove++;
+ }
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ confirm++;
+ }
+
+ @Override
+ public Set<String> getKeys() {
+ return null;
+ }
+
+ public int getAdd() {
+ return add;
+ }
+
+ public int getGet() {
+ return get;
+ }
+
+ public int getRemove() {
+ return remove;
+ }
+
+ public int getConfirm() {
+ return confirm;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java
new file mode 100644
index 0000000..27871c1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+
+/**
+ * @version
+ */
+public class AggregateCompletionOnlyTwoTest extends ContextTestSupport {
+
+ private MyRepo repo = new MyRepo();
+
+ public void testOnlyTwo() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("A+B", "C+END");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "B", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "C", "id", "foo");
+ template.sendBodyAndHeader("direct:start", "END", "id", "foo");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, repo.getGet());
+ assertEquals(2, repo.getAdd());
+ assertEquals(2, repo.getRemove());
+ assertEquals(2, repo.getConfirm());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).aggregationRepository(repo)
+ .completionSize(2)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private class MyRepo extends MemoryAggregationRepository {
+
+ private int add;
+ private int get;
+ private int remove;
+ private int confirm;
+
+ @Override
+ public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+ add++;
+ return super.add(camelContext, key, exchange);
+ }
+
+ @Override
+ public Exchange get(CamelContext camelContext, String key) {
+ get++;
+ return super.get(camelContext, key);
+ }
+
+ @Override
+ public void remove(CamelContext camelContext, String key, Exchange exchange) {
+ remove++;
+ super.remove(camelContext, key, exchange);
+ }
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ confirm++;
+ super.confirm(camelContext, exchangeId);
+ }
+
+ @Override
+ public Set<String> getKeys() {
+ return super.getKeys();
+ }
+
+ public int getAdd() {
+ return add;
+ }
+
+ public int getGet() {
+ return get;
+ }
+
+ public int getRemove() {
+ return remove;
+ }
+
+ public int getConfirm() {
+ return confirm;
+ }
+ }
+}