You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ad...@apache.org on 2022/08/05 07:55:54 UTC
[ratis] branch master updated: RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d9aa480c5 RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
d9aa480c5 is described below
commit d9aa480c53a69141aeb5bb7f3279ed843c12f322
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 00:55:48 2022 -0700
RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
---
.../org/apache/ratis/util/CollectionUtils.java | 14 +++++-
.../java/org/apache/ratis/util/SlidingWindow.java | 58 ++++++++++++++++------
2 files changed, 55 insertions(+), 17 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index cdfd9635b..db0c6fd93 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -103,8 +104,17 @@ public interface CollectionUtils {
}
static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) {
- final V returned = map.put(key, value);
- Preconditions.assertTrue(returned == null,
+ return putNew(key, value, map::put, name);
+ }
+
+ /** For the case that key and value are the same object. */
+ static <K> void putNew(K key, Function<K, K> putMethod, Supplier<Object> name) {
+ putNew(key, key, (k, v) -> putMethod.apply(k), name);
+ }
+
+ static <K, V> V putNew(K key, V value, BiFunction<K, V, V> putMethod, Supplier<Object> name) {
+ final V returned = putMethod.apply(key, value);
+ Preconditions.assertNull(returned,
() -> "Entry already exists for key " + key + " in map " + name.get());
return value;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 43b1efcdb..316604db0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -17,13 +17,14 @@
*/
package org.apache.ratis.util;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,14 +64,13 @@ public interface SlidingWindow {
/** A seqNum-to-request map, sorted by seqNum. */
class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
- private static boolean logRepeatedly = false;
private final Object name;
/** Request map: seqNum -> request */
private final SortedMap<Long, REQUEST> requests = new ConcurrentSkipListMap<>();
RequestMap(Object name) {
this.name = name;
- if (logRepeatedly && LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
}
}
@@ -185,6 +185,33 @@ public interface SlidingWindow {
}
}
+ class DelayedRequests {
+ private final SortedMap<Long, Long> sorted = new TreeMap<>();
+
+ synchronized Long put(Long seqNum) {
+ return sorted.put(seqNum, seqNum);
+ }
+
+ synchronized boolean containsKey(long seqNum) {
+ return sorted.containsKey(seqNum);
+ }
+
+ synchronized List<Long> getAllAndClear() {
+ final List<Long> keys = new ArrayList<>(sorted.keySet());
+ sorted.clear();
+ return keys;
+ }
+
+ synchronized Long remove(long seqNum) {
+ return sorted.remove(seqNum);
+ }
+
+ @Override
+ public synchronized String toString() {
+ return "" + sorted.keySet();
+ }
+ }
+
/**
* Client side sliding window.
* A client may
@@ -200,7 +227,7 @@ public interface SlidingWindow {
/** The requests in the sliding window. */
private final RequestMap<REQUEST, REPLY> requests;
/** Delayed requests. */
- private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
+ private final DelayedRequests delayedRequests = new DelayedRequests();
/** The seqNum for the next new request. */
private long nextSeqNum = 1;
@@ -214,9 +241,14 @@ public interface SlidingWindow {
public Client(Object name) {
this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), name)) {
@Override
- @SuppressFBWarnings("IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD")
- synchronized void log() {
- LOG.debug(toString());
+ void log() {
+ if (LOG.isDebugEnabled()) {
+ logDebug();
+ }
+ }
+
+ synchronized void logDebug() {
+ LOG.debug(super.toString());
for (REQUEST r : requests) {
LOG.debug(" {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
: delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
@@ -229,7 +261,7 @@ public interface SlidingWindow {
public synchronized String toString() {
return requests + ", nextSeqNum=" + nextSeqNum
+ ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
- + ", delayed=" + delayedRequests.keySet();
+ + ", delayed=" + delayedRequests;
}
/**
@@ -282,7 +314,7 @@ public interface SlidingWindow {
}
// delay other requests
- CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests");
+ CollectionUtils.putNew(seqNum, delayedRequests::put, () -> requests.getName() + ":delayedRequests");
return false;
}
@@ -326,12 +358,8 @@ public interface SlidingWindow {
private void trySendDelayed(Consumer<REQUEST> sendMethod) {
if (firstReplied) {
// after first received, all other requests can be submitted (out-of-order)
- if (!delayedRequests.isEmpty()) {
- for (Long seqNum : delayedRequests.keySet()) {
- sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"));
- }
- delayedRequests.clear();
- }
+ delayedRequests.getAllAndClear().forEach(
+ seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")));
} else {
// Otherwise, submit the first only if it is a delayed request
final Iterator<REQUEST> i = requests.iterator();