You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/04/12 12:42:16 UTC

[camel] 01/01: CAMEL-17948: Force waiting before checking the asserts of the mock

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-17948/race-condition-in-mockendpoint
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bd5a729c3b4c2a3d54166cd0c0c7a474d902e351
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Tue Apr 12 14:41:46 2022 +0200

    CAMEL-17948: Force waiting before checking the asserts of the mock
---
 .../apache/camel/component/mock/MockEndpoint.java  | 76 +++++++++++++++++-----
 1 file changed, 61 insertions(+), 15 deletions(-)

diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 7eb7873ce0c..f7e0ede24f7 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     private volatile Map<String, Object> actualHeaderValues;
     private volatile Map<String, Object> expectedPropertyValues;
 
-    private volatile int counter;
+    private final AtomicInteger counter = new AtomicInteger();
 
     @UriPath(description = "Name of mock endpoint")
     @Metadata(required = true)
@@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
     public void reset() {
         expectedCount = -1;
-        counter = 0;
+        counter.set(0);
         defaultProcessor = null;
         processors = new HashMap<>();
         receivedExchanges = new CopyOnWriteArrayList<>();
@@ -441,13 +442,15 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             }
             assertEquals("Received message count", expectedCount, getReceivedCounter());
         } else if (expectedCount > 0) {
-            if (expectedCount != getReceivedCounter()) {
-                waitForCompleteLatch();
-            }
+            // Always wait whatever the value of the received counter to ensure that all expected messages are
+            // fully processed (until the latch countDown)
+            waitForCompleteLatch();
             if (failFastAssertionError == null) {
                 assertEquals("Received message count", expectedCount, getReceivedCounter());
             }
-        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
+        } else if (expectedMinimumCount > 0) {
+            // Always wait whatever the value of the received counter to ensure that all expected messages are
+            // fully processed (until the latch countDown)
             waitForCompleteLatch();
         }
 
@@ -583,10 +586,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (factory != null) {
                 expectedHeaderValues = factory.newMap();
             } else {
-                // should not really happen but some tests dont start camel context
+                // should not really happen but some tests don't start camel context
                 expectedHeaderValues = new HashMap<>();
             }
-            // we just wants to expects to be called once
+            // we just wants to expect to be called once
             expects(new AssertionTask() {
                 @Override
                 public void assertOnIndex(int i) {
@@ -619,6 +622,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         expectedHeaderValues.put(name, value);
     }
 
+    /**
+     * Sets an expectation that no header is expected in the messages received by this endpoint
+     */
+    public void expectedNoHeaderReceived() {
+        if (expectedMinimumCount == -1 && expectedCount <= 0) {
+            expectedMinimumMessageCount(1);
+        }
+        // we just wants to expect to be called once
+        expects(new AssertionTask() {
+            @Override
+            public void assertOnIndex(int i) {
+                Exchange exchange = getReceivedExchange(i);
+                assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders());
+            }
+
+            public void run() {
+                for (int i = 0; i < getReceivedExchanges().size(); i++) {
+                    assertOnIndex(i);
+                }
+            }
+        });
+    }
+
     /**
      * Adds an expectation that the given header values are received by this endpoint in any order.
      * <p/>
@@ -1344,7 +1370,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public int getReceivedCounter() {
-        return counter;
+        return counter.get();
     }
 
     public List<Exchange> getReceivedExchanges() {
@@ -1574,7 +1600,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         try {
             if (log) {
                 String line = getComponent().getExchangeFormatter().format(exchange);
-                LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line);
+                LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line);
             }
 
             if (reporter != null) {
@@ -1678,10 +1704,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         // add a copy of the received exchange
         addReceivedExchange(copy);
         // and then increment counter after adding received exchange
-        ++counter;
+        final int receivedCounter = counter.incrementAndGet();
 
-        Processor processor = processors.get(getReceivedCounter()) != null
-                ? processors.get(getReceivedCounter()) : defaultProcessor;
+        Processor processor = processors.get(receivedCounter) != null
+                ? processors.get(receivedCounter) : defaultProcessor;
 
         if (processor != null) {
             try {
@@ -1708,8 +1734,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             receivedExchanges.add(copy);
         } else {
             // okay there is some sort of limitations, so figure out what to retain
-            if (retainFirst > 0 && counter < retainFirst) {
-                // store a copy as its within the retain first limitation
+            if (retainFirst > 0 && counter.get() < retainFirst) {
+                // store a copy as it is within the retain first limitation
                 receivedExchanges.add(copy);
             } else if (retainLast > 0) {
                 // remove the oldest from the last retained boundary,
@@ -1760,12 +1786,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
     protected void assertTrue(String message, boolean predicate) {
         if (!predicate) {
             fail(message);
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
+    protected void assertFalse(String message, boolean predicate) {
+        if (predicate) {
+            fail(message);
+        }
+    }
+
     protected void fail(Object message) {
         if (LOG.isDebugEnabled()) {
             List<Exchange> list = getReceivedExchanges();