You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/04/12 16:07:10 UTC
[camel] branch main updated: CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b7fcb139914 CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)
b7fcb139914 is described below
commit b7fcb13991422c7aabca9130f3df07c8c1d4637a
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Tue Apr 12 18:07:02 2022 +0200
CAMEL-17948: Force waiting before checking the asserts of the mock (#7420)
## Motivation
While working on [CAMEL-17945](https://issues.apache.org/jira/browse/CAMEL-17945), I realized that the tests with the scope PER_CLASS leveraging the MockEndpoint to define their assertions can fail randomly because of a race condition issue. Indeed, with the current code, a message of a previous test method can potentially call the method countDown on the latch of the following test method causing unexpected behavior.
## Modifications:
* When at least one message is expected, always wait until the latch is released to prevent conflicts of messages of different test methods
* Convert the counter to an `AtomicInteger` to guarantee the rest of an increment
* Use a fixed value of the counter when getting the `Processor` to avoid potential race condition issues between 2 successive accesses
* Add the new method `expectedNoHeaderReceived` to check if the received message has no headers (not related to the initial issue but needed for CAMEL-17945)
* Add the new method `assertFalse` to check if a given predicate is `false` (not related to the initial issue but needed for CAMEL-17945)
---
.../apache/camel/component/mock/MockEndpoint.java | 78 +++++++++++++++++-----
1 file changed, 60 insertions(+), 18 deletions(-)
diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 7eb7873ce0c..2e33b9bda26 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
private volatile Map<String, Object> actualHeaderValues;
private volatile Map<String, Object> expectedPropertyValues;
- private volatile int counter;
+ private final AtomicInteger counter = new AtomicInteger();
@UriPath(description = "Name of mock endpoint")
@Metadata(required = true)
@@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
public void reset() {
expectedCount = -1;
- counter = 0;
+ counter.set(0);
defaultProcessor = null;
processors = new HashMap<>();
receivedExchanges = new CopyOnWriteArrayList<>();
@@ -440,15 +441,13 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
Thread.sleep(timeoutForEmptyEndpoints);
}
assertEquals("Received message count", expectedCount, getReceivedCounter());
- } else if (expectedCount > 0) {
- if (expectedCount != getReceivedCounter()) {
- waitForCompleteLatch();
- }
- if (failFastAssertionError == null) {
+ } else if (expectedCount > 0 || expectedMinimumCount > 0) {
+ // Always wait whatever the value of the received counter to ensure that all expected messages are
+ // fully processed (until the latch countDown)
+ waitForCompleteLatch();
+ if (expectedCount > 0 && failFastAssertionError == null) {
assertEquals("Received message count", expectedCount, getReceivedCounter());
}
- } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
- waitForCompleteLatch();
}
if (failFastAssertionError != null) {
@@ -583,10 +582,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
if (factory != null) {
expectedHeaderValues = factory.newMap();
} else {
- // should not really happen but some tests dont start camel context
+ // should not really happen but some tests don't start camel context
expectedHeaderValues = new HashMap<>();
}
- // we just wants to expects to be called once
+ // we just wants to expect to be called once
expects(new AssertionTask() {
@Override
public void assertOnIndex(int i) {
@@ -619,6 +618,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
expectedHeaderValues.put(name, value);
}
+ /**
+ * Sets an expectation that the messages received by this endpoint have no header
+ */
+ public void expectedNoHeaderReceived() {
+ if (expectedMinimumCount == -1 && expectedCount <= 0) {
+ expectedMinimumMessageCount(1);
+ }
+ // we just wants to expect to be called once
+ expects(new AssertionTask() {
+ @Override
+ public void assertOnIndex(int i) {
+ Exchange exchange = getReceivedExchange(i);
+ assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders());
+ }
+
+ public void run() {
+ for (int i = 0; i < getReceivedExchanges().size(); i++) {
+ assertOnIndex(i);
+ }
+ }
+ });
+ }
+
/**
* Adds an expectation that the given header values are received by this endpoint in any order.
* <p/>
@@ -1344,7 +1366,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
public int getReceivedCounter() {
- return counter;
+ return counter.get();
}
public List<Exchange> getReceivedExchanges() {
@@ -1574,7 +1596,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
try {
if (log) {
String line = getComponent().getExchangeFormatter().format(exchange);
- LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line);
+ LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line);
}
if (reporter != null) {
@@ -1678,10 +1700,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
// add a copy of the received exchange
addReceivedExchange(copy);
// and then increment counter after adding received exchange
- ++counter;
+ final int receivedCounter = counter.incrementAndGet();
- Processor processor = processors.get(getReceivedCounter()) != null
- ? processors.get(getReceivedCounter()) : defaultProcessor;
+ Processor processor = processors.get(receivedCounter) != null
+ ? processors.get(receivedCounter) : defaultProcessor;
if (processor != null) {
try {
@@ -1708,8 +1730,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
receivedExchanges.add(copy);
} else {
// okay there is some sort of limitations, so figure out what to retain
- if (retainFirst > 0 && counter < retainFirst) {
- // store a copy as its within the retain first limitation
+ if (retainFirst > 0 && counter.get() < retainFirst) {
+ // store a copy as it is within the retain first limitation
receivedExchanges.add(copy);
} else if (retainLast > 0) {
// remove the oldest from the last retained boundary,
@@ -1760,12 +1782,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
}
+ /**
+ * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the
+ * give message.
+ *
+ * @param message the message to use in case of a failure.
+ * @param predicate the predicate allowing to determinate if it is a failure or not.
+ */
protected void assertTrue(String message, boolean predicate) {
if (!predicate) {
fail(message);
}
}
+ /**
+ * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the
+ * give message.
+ *
+ * @param message the message to use in case of a failure.
+ * @param predicate the predicate allowing to determinate if it is a failure or not.
+ */
+ protected void assertFalse(String message, boolean predicate) {
+ if (predicate) {
+ fail(message);
+ }
+ }
+
protected void fail(Object message) {
if (LOG.isDebugEnabled()) {
List<Exchange> list = getReceivedExchanges();