You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/23 12:24:17 UTC

[3/3] git commit: CAMEL-6775: Optimized aggergate eip to send out aggregated exchange outside lock.

CAMEL-6775: Optimized aggergate eip to send out aggregated exchange outside lock.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ca90341e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ca90341e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ca90341e

Branch: refs/heads/master
Commit: ca90341e9eb825d7c06ded69e1b4438dc470bb97
Parents: 32a8947
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 23 11:07:55 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 23 11:07:55 2013 +0200

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 26 ++++---
 .../aggregator/AggregatorLockingTest.java       | 72 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ca90341e/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 86094bc..1f0336c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -212,7 +212,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
         // when optimist locking is enabled we keep trying until we succeed
         if (optimisticLocking) {
-            Exchange aggregated = null;
+            List<Exchange> aggregated = null;
             boolean exhaustedRetries = true;
             int attempt = 0;
             do {
@@ -236,7 +236,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                         new OptimisticLockingAggregationRepository.OptimisticLockingException());
             } else if (aggregated != null) {
                 // we are completed so submit to completion
-                onSubmitCompletion(key, aggregated);
+                for (Exchange agg : aggregated) {
+                    onSubmitCompletion(key, agg);
+                }
             }
         } else {
             // copy exchange, and do not share the unit of work
@@ -246,7 +248,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             // when memory based then its fast using synchronized, but if the aggregation repository is IO
             // bound such as JPA etc then concurrent aggregation per correlation key could
             // improve performance as we can run aggregation repository get/add in parallel
-            Exchange aggregated = null;
+            List<Exchange> aggregated = null;
             lock.lock();
             try {
                 aggregated = doAggregation(key, copy);
@@ -256,7 +258,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
             // we are completed so do that work outside the lock
             if (aggregated != null) {
-                onSubmitCompletion(key, aggregated);
+                for (Exchange agg : aggregated) {
+                    onSubmitCompletion(key, agg);
+                }
             }
         }
 
@@ -278,10 +282,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
      *
      * @param key      the correlation key
      * @param newExchange the exchange
-     * @return the aggregated exchange which is complete, or <tt>null</tt> if not yet complete
+     * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete
      * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
      */
-    private Exchange doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
+    private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
 
         Exchange answer;
@@ -329,6 +333,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             complete = isCompleted(key, answer);
         }
 
+        List<Exchange> list = new ArrayList<Exchange>();
+
         // only need to update aggregation repository if we are not complete
         if (complete == null) {
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
@@ -349,7 +355,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                     if (batchAnswer != null) {
                         batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
                         onCompletion(batchKey, originalExchange, batchAnswer, false);
-                        onSubmitCompletion(key, batchAnswer);
+                        list.add(batchAnswer);
                     }
                 }
                 batchConsumerCorrelationKeys.clear();
@@ -363,8 +369,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         }
 
         LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
-
-        return answer;
+        if (answer != null) {
+            list.add(answer);
+        }
+        return list;
     }
 
     protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/ca90341e/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorLockingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorLockingTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorLockingTest.java
new file mode 100644
index 0000000..d988c9b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorLockingTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+public class AggregatorLockingTest extends ContextTestSupport {
+
+    private final CountDownLatch latch = new CountDownLatch(2);
+
+    public void testAggregationWithoutParallelNorOptimisticShouldNotLockDownstreamProcessors() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("a","b");
+
+        template.sendBodyAndHeader("seda:a", "a", "myId", 1);
+        template.sendBodyAndHeader("seda:a", "b", "myId", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:a?concurrentConsumers=2")
+                    .aggregate(header("myId"), new UseLatestAggregationStrategy())
+                    .completionSize(1)
+                    // N.B. *no* parallelProcessing() nor optimisticLocking() !
+                    // each thread releases 1 permit and then blocks waiting for other threads.
+                    // if there are <THREAD_COUNT> threads running in parallel, then all N threads will release
+                    // and we will proceed. If the threads are prevented from running simultaneously due to the
+                    // lock in AggregateProcessor.doProcess() then only 1 thread will run and will not release
+                    // the current thread, causing the test to time out.
+                    .log("Before await with thread: ${threadName} and body: ${body}")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            latch.countDown();
+                            // block until the other thread counts down as well
+                            latch.await(5, TimeUnit.SECONDS);
+                        }
+                    })
+                    .log("After await with thread: ${threadName} and body: ${body}")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}