You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/04/12 16:07:10 UTC

[camel] branch main updated: CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b7fcb139914 CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)
b7fcb139914 is described below

commit b7fcb13991422c7aabca9130f3df07c8c1d4637a
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Tue Apr 12 18:07:02 2022 +0200

    CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)
    
    ## Motivation
    
    While working on [CAMEL-17945](https://issues.apache.org/jira/browse/CAMEL-17945), I realized that the tests with the scope PER_CLASS leveraging the MockEndpoint to define their assertions can fail randomly because of a race condition issue. Indeed, with the current code, a message of a previous test method can potentially call the method countDown on the latch of the following test method causing unexpected behavior.
    
    ## Modifications:
    
    * When at least one message is expected, always wait until the latch is released to prevent conflicts of messages of different test methods
    * Convert the counter to an `AtomicInteger` to guarantee the rest of an increment
    * Use a fixed value of the counter when getting the `Processor` to avoid potential race condition issues between 2 successive accesses
    * Add the new method `expectedNoHeaderReceived` to check if the received message has no headers (not related to the initial issue but needed for CAMEL-17945)
    * Add the new method `assertFalse` to check if a given predicate is `false` (not related to the initial issue but needed for CAMEL-17945)
---
 .../apache/camel/component/mock/MockEndpoint.java  | 78 +++++++++++++++++-----
 1 file changed, 60 insertions(+), 18 deletions(-)

diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 7eb7873ce0c..2e33b9bda26 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     private volatile Map<String, Object> actualHeaderValues;
     private volatile Map<String, Object> expectedPropertyValues;
 
-    private volatile int counter;
+    private final AtomicInteger counter = new AtomicInteger();
 
     @UriPath(description = "Name of mock endpoint")
     @Metadata(required = true)
@@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
     public void reset() {
         expectedCount = -1;
-        counter = 0;
+        counter.set(0);
         defaultProcessor = null;
         processors = new HashMap<>();
         receivedExchanges = new CopyOnWriteArrayList<>();
@@ -440,15 +441,13 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
                 Thread.sleep(timeoutForEmptyEndpoints);
             }
             assertEquals("Received message count", expectedCount, getReceivedCounter());
-        } else if (expectedCount > 0) {
-            if (expectedCount != getReceivedCounter()) {
-                waitForCompleteLatch();
-            }
-            if (failFastAssertionError == null) {
+        } else if (expectedCount > 0 || expectedMinimumCount > 0) {
+            // Always wait whatever the value of the received counter to ensure that all expected messages are
+            // fully processed (until the latch countDown)
+            waitForCompleteLatch();
+            if (expectedCount > 0 && failFastAssertionError == null) {
                 assertEquals("Received message count", expectedCount, getReceivedCounter());
             }
-        } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
-            waitForCompleteLatch();
         }
 
         if (failFastAssertionError != null) {
@@ -583,10 +582,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (factory != null) {
                 expectedHeaderValues = factory.newMap();
             } else {
-                // should not really happen but some tests dont start camel context
+                // should not really happen but some tests don't start camel context
                 expectedHeaderValues = new HashMap<>();
             }
-            // we just wants to expects to be called once
+            // we just wants to expect to be called once
             expects(new AssertionTask() {
                 @Override
                 public void assertOnIndex(int i) {
@@ -619,6 +618,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         expectedHeaderValues.put(name, value);
     }
 
+    /**
+     * Sets an expectation that the messages received by this endpoint have no header
+     */
+    public void expectedNoHeaderReceived() {
+        if (expectedMinimumCount == -1 && expectedCount <= 0) {
+            expectedMinimumMessageCount(1);
+        }
+        // we just wants to expect to be called once
+        expects(new AssertionTask() {
+            @Override
+            public void assertOnIndex(int i) {
+                Exchange exchange = getReceivedExchange(i);
+                assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders());
+            }
+
+            public void run() {
+                for (int i = 0; i < getReceivedExchanges().size(); i++) {
+                    assertOnIndex(i);
+                }
+            }
+        });
+    }
+
     /**
      * Adds an expectation that the given header values are received by this endpoint in any order.
      * <p/>
@@ -1344,7 +1366,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public int getReceivedCounter() {
-        return counter;
+        return counter.get();
     }
 
     public List<Exchange> getReceivedExchanges() {
@@ -1574,7 +1596,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         try {
             if (log) {
                 String line = getComponent().getExchangeFormatter().format(exchange);
-                LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line);
+                LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line);
             }
 
             if (reporter != null) {
@@ -1678,10 +1700,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         // add a copy of the received exchange
         addReceivedExchange(copy);
         // and then increment counter after adding received exchange
-        ++counter;
+        final int receivedCounter = counter.incrementAndGet();
 
-        Processor processor = processors.get(getReceivedCounter()) != null
-                ? processors.get(getReceivedCounter()) : defaultProcessor;
+        Processor processor = processors.get(receivedCounter) != null
+                ? processors.get(receivedCounter) : defaultProcessor;
 
         if (processor != null) {
             try {
@@ -1708,8 +1730,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             receivedExchanges.add(copy);
         } else {
             // okay there is some sort of limitations, so figure out what to retain
-            if (retainFirst > 0 && counter < retainFirst) {
-                // store a copy as its within the retain first limitation
+            if (retainFirst > 0 && counter.get() < retainFirst) {
+                // store a copy as it is within the retain first limitation
                 receivedExchanges.add(copy);
             } else if (retainLast > 0) {
                 // remove the oldest from the last retained boundary,
@@ -1760,12 +1782,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
     protected void assertTrue(String message, boolean predicate) {
         if (!predicate) {
             fail(message);
         }
     }
 
+    /**
+     * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the
+     * give message.
+     *
+     * @param message   the message to use in case of a failure.
+     * @param predicate the predicate allowing to determinate if it is a failure or not.
+     */
+    protected void assertFalse(String message, boolean predicate) {
+        if (predicate) {
+            fail(message);
+        }
+    }
+
     protected void fail(Object message) {
         if (LOG.isDebugEnabled()) {
             List<Exchange> list = getReceivedExchanges();