You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/13 11:21:24 UTC

[2/2] camel git commit: CAMEL-9863: Fixed bug in doWhileLoop when calling async component. Thanks to Sanigo for unit test.

CAMEL-9863: Fixed bug in doWhileLoop when calling async component. Thanks to Sanigo for unit test.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d6da3b7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d6da3b7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d6da3b7

Branch: refs/heads/camel-2.17.x
Commit: 4d6da3b7b0f293271ef4297683c854c08f9c82f8
Parents: 4847075
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Apr 13 11:05:45 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 13 11:21:12 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/processor/LoopProcessor.java   | 71 ++++++++------------
 .../async/AsyncEndpointDoWhileLoopTest.java     | 54 +++++++++++++++
 .../ahc/AhcProducePostDoWhileTest.java          | 55 +++++++++++++++
 3 files changed, 137 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4d6da3b7/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 2fb244d..07bb255 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -86,16 +86,18 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
         }
 
         // loop synchronously
-        while ((predicate != null && doWhile.get())  || (index.get() < count.get())) {
+        while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
 
             // and prepare for next iteration
             // if (!copy) target = exchange; else copy of original
             target = prepareExchange(exchange, index.get(), original);
+            // the following process method will in the done method re-evaluate the predicate
+            // so we do not need to do it here as well
             boolean sync = process(target, callback, index, count, doWhile, original);
 
             if (!sync) {
                 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
-                // the remainder of the routing slip will be completed async
+                // the remainder of the loop will be completed async
                 // so we break out now, then the callback will be invoked which then continue routing from where we left here
                 return false;
             }
@@ -106,21 +108,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             if (!continueProcessing(target, "so breaking out of loop", LOG)) {
                 break;
             }
-
-            // increment counter before next loop
-            index.getAndIncrement();
-
-            // evaluate predicate
-            if (predicate != null) {
-                try {
-                    boolean result = predicate.matches(exchange);
-                    doWhile.set(result);
-                } catch (Exception e) {
-                    // break out looping due that exception
-                    exchange.setException(e);
-                    doWhile.set(false);
-                }
-            }
         }
 
         // we are done so prepare the result
@@ -137,21 +124,39 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
         // set current index as property
         LOG.debug("LoopProcessor: iteration #{}", index.get());
         exchange.setProperty(Exchange.LOOP_INDEX, index.get());
-        
+
         boolean sync = processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
-                // we only have to handle async completion of the routing slip
+                // increment counter after done
+                index.getAndIncrement();
+
+                // evaluate predicate for next loop
+                if (predicate != null && index.get() > 0) {
+                    try {
+                        boolean result = predicate.matches(exchange);
+                        doWhile.set(result);
+                    } catch (Exception e) {
+                        // break out looping due that exception
+                        exchange.setException(e);
+                        doWhile.set(false);
+                    }
+                }
+
+                // we only have to handle async completion of the loop
+                // (as the sync is done in the outer processor)
                 if (doneSync) {
                     return;
                 }
 
                 Exchange target = exchange;
 
-                // increment index as we have just processed once
-                index.getAndIncrement();
-
                 // continue looping asynchronously
-                while ((predicate != null && doWhile.get())  || (index.get() < count.get())) {
+                while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
+
+                    // check for error if so we should break out
+                    if (!continueProcessing(target, "so breaking out of loop", LOG)) {
+                        break;
+                    }
 
                     // and prepare for next iteration
                     target = prepareExchange(exchange, index.get(), original);
@@ -164,26 +169,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                         // so we break out now, then the callback will be invoked which then continue routing from where we left here
                         return;
                     }
-
-                    // check for error if so we should break out
-                    if (!continueProcessing(target, "so breaking out of loop", LOG)) {
-                        break;
-                    }
-
-                    // increment counter before next loop
-                    index.getAndIncrement();
-
-                    // evaluate predicate
-                    if (predicate != null) {
-                        try {
-                            boolean result = predicate.matches(exchange);
-                            doWhile.set(result);
-                        } catch (Exception e) {
-                            // break out looping due that exception
-                            exchange.setException(e);
-                            doWhile.set(false);
-                        }
-                    }
                 }
 
                 // we are done so prepare the result

http://git-wip-us.apache.org/repos/asf/camel/blob/4d6da3b7/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java
new file mode 100644
index 0000000..a551727
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDoWhileLoopTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+
+public class AsyncEndpointDoWhileLoopTest extends ContextTestSupport {
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:line").expectedBodiesReceived("Bye Camel", "Bye Camel", "Bye Camel", "Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("done");
+
+        template.requestBody("direct:start", "World", String.class);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                    .loopDoWhile(body().isNotEqualTo("done"))
+                        .to("async:bye:camel")
+                        .to("mock:line")
+                        .filter(exchangeProperty(Exchange.LOOP_INDEX).isEqualTo(3))
+                            .setBody().constant("done")
+                        .end()
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4d6da3b7/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java b/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java
new file mode 100644
index 0000000..b7193a2
--- /dev/null
+++ b/components/camel-ahc/src/test/java/org/apache/camel/component/ahc/AhcProducePostDoWhileTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ahc;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class AhcProducePostDoWhileTest extends BaseAhcTest {
+
+    @Test
+    public void testAhcDoWhile() throws Exception {
+        getMockEndpoint("mock:line").expectedBodiesReceived("Bye World", "Bye Bye World", "Bye Bye Bye World", "Bye Bye Bye Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("done");
+
+        template.requestBody("direct:start", "World", String.class);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").streamCaching()
+                    .loopDoWhile(body().isNotEqualTo("done"))
+                        .to(getAhcEndpointUri())
+                        .to("mock:line")
+                        .filter(exchangeProperty(Exchange.LOOP_INDEX).isEqualTo(3))
+                            .setBody().constant("done")
+                        .end()
+                    .end()
+                    .to("mock:result");
+
+                from(getTestServerEndpointUri())
+                        .transform(simple("Bye ${body}"));
+            }
+        };
+    }
+}