You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/04/12 12:42:15 UTC

[camel] branch CAMEL-17948/race-condition-in-mockendpoint created (now bd5a729c3b4)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-17948/race-condition-in-mockendpoint
in repository https://gitbox.apache.org/repos/asf/camel.git


      at bd5a729c3b4 CAMEL-17948: Force waiting before checking the asserts of the mock

This branch includes the following new commits:

     new bd5a729c3b4 CAMEL-17948: Force waiting before checking the asserts of the mock

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-17948: Force waiting before checking the asserts of the mock

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-17948/race-condition-in-mockendpoint
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bd5a729c3b4c2a3d54166cd0c0c7a474d902e351
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Tue Apr 12 14:41:46 2022 +0200

    CAMEL-17948: Force waiting before checking the asserts of the mock
---
 .../apache/camel/component/mock/MockEndpoint.java  | 76 +++++++++++++++++-----
 1 file changed, 61 insertions(+), 15 deletions(-)

diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 7eb7873ce0c..f7e0ede24f7 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     private volatile Map<String, Object> actualHeaderValues;
     private volatile Map<String, Object> expectedPropertyValues;
 
-    private volatile int counter;
+    private final AtomicInteger counter = new AtomicInteger();
 
     @UriPath(description = "Name of mock endpoint")
     @Metadata(required = true)
@@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
     public void reset() {
         expectedCount = -1;
-        counter = 0;
+        counter.set(0);
         defaultProcessor = null;
         processors = new HashMap<>();
         receivedExchanges = new CopyOnWriteArrayList<>();
@@ -441,13 +442,15 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             }
             assertEquals("Received message count", expectedCount, getReceivedCounter());
         } else if (expectedCount > 0) {
-            if (expectedCount != getReceivedCounter()) {
-                waitForCompleteLatch();
-            }
+            // Always wait whatever the value of the received counter to ensure that all expected messages are
+            // fully processed (until the latch countDown)
+            waitForCompleteLatch();
             if (failFastAssertionError == null) {
                 assertEquals("Received message count", expectedCount, getReceivedCounter());
             }
-        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
+        } else if (expectedMinimumCount > 0) {
+            // Always wait whatever the value of the received counter to ensure that all expected messages are
+            // fully processed (until the latch countDown)
             waitForCompleteLatch();
         }
 
@@ -583,10 +586,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (factory != null) {
                 expectedHeaderValues = factory.newMap();
             } else {
-                // should not really happen but some tests dont start camel context
+                // should not really happen but some tests don't start camel context
                 expectedHeaderValues = new HashMap<>();
             }
-            // we just wants to expects to be called once
+            // we just wants to expect to be called once
             expects(new AssertionTask() {
                 @Override
                 public void assertOnIndex(int i) {
@@ -619,6 +622,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         expectedHeaderValues.put(name, value);
     }
 
+    /**
+     * Sets an expectation that no header is expected in the messages received by this endpoint
+     */
+    public void expectedNoHeaderReceived() {
+        if (expectedMinimumCount == -1 && expectedCount <= 0) {
+            expectedMinimumMessageCount(1);
+        }
+        // we just wants to expect to be called once
+        expects(new AssertionTask() {
+            @Override
+            public void assertOnIndex(int i) {
+                Exchange exchange = getReceivedExchange(i);
+                assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders());
+            }
+
+            public void run() {
+                for (int i = 0; i < getReceivedExchanges().size(); i++) {
+                    assertOnIndex(i);
+                }
+            }
+        });
+    }
+
     /**
      * Adds an expectation that the given header values are received by this endpoint in any order.
      * <p/>
@@ -1344,7 +1370,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public int getReceivedCounter() {
-        return counter;
+        return counter.get();
     }
 
     public List<Exchange> getReceivedExchanges() {
@@ -1574,7 +1600,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         try {
             if (log) {
                 String line = getComponent().getExchangeFormatter().format(exchange);
-                LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line);
+                LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line);
             }
 
             if (reporter != null) {
@@ -1678,10 +1704,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         // add a copy of the received exchange
         addReceivedExchange(copy);
         // and then increment counter after adding received exchange
-        ++counter;
+        final int receivedCounter = counter.incrementAndGet();
 
-        Processor processor = processors.get(getReceivedCounter()) != null
-                ? processors.get(getReceivedCounter()) : defaultProcessor;
+        Processor processor = processors.get(receivedCounter) != null
+                ? processors.get(receivedCounter) : defaultProcessor;
 
         if (processor != null) {
             try {
@@ -1708,8 +1734,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             receivedExchanges.add(copy);
         } else {
             // okay there is some sort of limitations, so figure out what to retain
-            if (retainFirst > 0 && counter < retainFirst) {
-                // store a copy as its within the retain first limitation
+            if (retainFirst > 0 && counter.get() < retainFirst) {
+                // store a copy as it is within the retain first limitation
                 receivedExchanges.add(copy);
             } else if (retainLast > 0) {
                 // remove the oldest from the last retained boundary,
@@ -1760,12 +1786,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
     protected void assertTrue(String message, boolean predicate) {
         if (!predicate) {
             fail(message);
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
+    protected void assertFalse(String message, boolean predicate) {
+        if (predicate) {
+            fail(message);
+        }
+    }
+
     protected void fail(Object message) {
         if (LOG.isDebugEnabled()) {
             List<Exchange> list = getReceivedExchanges();