You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/04/12 12:42:16 UTC
[camel] 01/01: CAMEL-17948: Force waiting before checking the asserts of the mock
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch CAMEL-17948/race-condition-in-mockendpoint
in repository https://gitbox.apache.org/repos/asf/camel.git
commit bd5a729c3b4c2a3d54166cd0c0c7a474d902e351
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Tue Apr 12 14:41:46 2022 +0200
CAMEL-17948: Force waiting before checking the asserts of the mock
---
.../apache/camel/component/mock/MockEndpoint.java | 76 +++++++++++++++++-----
1 file changed, 61 insertions(+), 15 deletions(-)
diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 7eb7873ce0c..f7e0ede24f7 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
private volatile Map<String, Object> actualHeaderValues;
private volatile Map<String, Object> expectedPropertyValues;
- private volatile int counter;
+ private final AtomicInteger counter = new AtomicInteger();
@UriPath(description = "Name of mock endpoint")
@Metadata(required = true)
@@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
public void reset() {
expectedCount = -1;
- counter = 0;
+ counter.set(0);
defaultProcessor = null;
processors = new HashMap<>();
receivedExchanges = new CopyOnWriteArrayList<>();
@@ -441,13 +442,15 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
assertEquals("Received message count", expectedCount, getReceivedCounter());
} else if (expectedCount > 0) {
- if (expectedCount != getReceivedCounter()) {
- waitForCompleteLatch();
- }
+ // Always wait whatever the value of the received counter to ensure that all expected messages are
+ // fully processed (until the latch countDown)
+ waitForCompleteLatch();
if (failFastAssertionError == null) {
assertEquals("Received message count", expectedCount, getReceivedCounter());
}
- } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
+ } else if (expectedMinimumCount > 0) {
+ // Always wait whatever the value of the received counter to ensure that all expected messages are
+ // fully processed (until the latch countDown)
waitForCompleteLatch();
}
@@ -583,10 +586,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
if (factory != null) {
expectedHeaderValues = factory.newMap();
} else {
- // should not really happen but some tests dont start camel context
+ // should not really happen but some tests don't start camel context
expectedHeaderValues = new HashMap<>();
}
- // we just wants to expects to be called once
+ // we just wants to expect to be called once
expects(new AssertionTask() {
@Override
public void assertOnIndex(int i) {
@@ -619,6 +622,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
expectedHeaderValues.put(name, value);
}
+ /**
+ * Sets an expectation that no header is expected in the messages received by this endpoint
+ */
+ public void expectedNoHeaderReceived() {
+ if (expectedMinimumCount == -1 && expectedCount <= 0) {
+ expectedMinimumMessageCount(1);
+ }
+ // we just wants to expect to be called once
+ expects(new AssertionTask() {
+ @Override
+ public void assertOnIndex(int i) {
+ Exchange exchange = getReceivedExchange(i);
+ assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders());
+ }
+
+ public void run() {
+ for (int i = 0; i < getReceivedExchanges().size(); i++) {
+ assertOnIndex(i);
+ }
+ }
+ });
+ }
+
/**
* Adds an expectation that the given header values are received by this endpoint in any order.
* <p/>
@@ -1344,7 +1370,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
public int getReceivedCounter() {
- return counter;
+ return counter.get();
}
public List<Exchange> getReceivedExchanges() {
@@ -1574,7 +1600,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
try {
if (log) {
String line = getComponent().getExchangeFormatter().format(exchange);
- LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line);
+ LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line);
}
if (reporter != null) {
@@ -1678,10 +1704,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
// add a copy of the received exchange
addReceivedExchange(copy);
// and then increment counter after adding received exchange
- ++counter;
+ final int receivedCounter = counter.incrementAndGet();
- Processor processor = processors.get(getReceivedCounter()) != null
- ? processors.get(getReceivedCounter()) : defaultProcessor;
+ Processor processor = processors.get(receivedCounter) != null
+ ? processors.get(receivedCounter) : defaultProcessor;
if (processor != null) {
try {
@@ -1708,8 +1734,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
receivedExchanges.add(copy);
} else {
// okay there is some sort of limitations, so figure out what to retain
- if (retainFirst > 0 && counter < retainFirst) {
- // store a copy as its within the retain first limitation
+ if (retainFirst > 0 && counter.get() < retainFirst) {
+ // store a copy as it is within the retain first limitation
receivedExchanges.add(copy);
} else if (retainLast > 0) {
// remove the oldest from the last retained boundary,
@@ -1760,12 +1786,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
}
+ /**
+ * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the
+ * give message.
+ *
+ * @param message the message to use in case of a failure.
+ * @param predicate the predicate allowing to determinate if it is a failure or not.
+ */
protected void assertTrue(String message, boolean predicate) {
if (!predicate) {
fail(message);
}
}
+ /**
+ * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the
+ * give message.
+ *
+ * @param message the message to use in case of a failure.
+ * @param predicate the predicate allowing to determinate if it is a failure or not.
+ */
+ protected void assertFalse(String message, boolean predicate) {
+ if (predicate) {
+ fail(message);
+ }
+ }
+
protected void fail(Object message) {
if (LOG.isDebugEnabled()) {
List<Exchange> list = getReceivedExchanges();