You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ad...@apache.org on 2022/08/05 07:55:54 UTC

[ratis] branch master updated: RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d9aa480c5 RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
d9aa480c5 is described below

commit d9aa480c53a69141aeb5bb7f3279ed843c12f322
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 00:55:48 2022 -0700

    RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
---
 .../org/apache/ratis/util/CollectionUtils.java     | 14 +++++-
 .../java/org/apache/ratis/util/SlidingWindow.java  | 58 ++++++++++++++++------
 2 files changed, 55 insertions(+), 17 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index cdfd9635b..db0c6fd93 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -103,8 +104,17 @@ public interface CollectionUtils {
   }
 
   static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) {
-    final V returned = map.put(key, value);
-    Preconditions.assertTrue(returned == null,
+    return putNew(key, value, map::put, name);
+  }
+
+  /** For the case that key and value are the same object. */
+  static <K> void putNew(K key, Function<K, K> putMethod, Supplier<Object> name) {
+    putNew(key, key, (k, v) -> putMethod.apply(k), name);
+  }
+
+  static <K, V> V putNew(K key, V value, BiFunction<K, V, V> putMethod, Supplier<Object> name) {
+    final V returned = putMethod.apply(key, value);
+    Preconditions.assertNull(returned,
         () -> "Entry already exists for key " + key + " in map " + name.get());
     return value;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 43b1efcdb..316604db0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -17,13 +17,14 @@
  */
 package org.apache.ratis.util;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,14 +64,13 @@ public interface SlidingWindow {
 
   /** A seqNum-to-request map, sorted by seqNum. */
   class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
-    private static boolean logRepeatedly = false;
     private final Object name;
     /** Request map: seqNum -> request */
     private final SortedMap<Long, REQUEST> requests = new ConcurrentSkipListMap<>();
 
     RequestMap(Object name) {
       this.name = name;
-      if (logRepeatedly && LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
       }
     }
@@ -185,6 +185,33 @@ public interface SlidingWindow {
     }
   }
 
+  class DelayedRequests {
+    private final SortedMap<Long, Long> sorted = new TreeMap<>();
+
+    synchronized Long put(Long seqNum) {
+      return sorted.put(seqNum, seqNum);
+    }
+
+    synchronized boolean containsKey(long seqNum) {
+      return sorted.containsKey(seqNum);
+    }
+
+    synchronized List<Long> getAllAndClear() {
+      final List<Long> keys = new ArrayList<>(sorted.keySet());
+      sorted.clear();
+      return keys;
+    }
+
+    synchronized Long remove(long seqNum) {
+      return sorted.remove(seqNum);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return "" + sorted.keySet();
+    }
+  }
+
   /**
    * Client side sliding window.
    * A client may
@@ -200,7 +227,7 @@ public interface SlidingWindow {
     /** The requests in the sliding window. */
     private final RequestMap<REQUEST, REPLY> requests;
     /** Delayed requests. */
-    private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
+    private final DelayedRequests delayedRequests = new DelayedRequests();
 
     /** The seqNum for the next new request. */
     private long nextSeqNum = 1;
@@ -214,9 +241,14 @@ public interface SlidingWindow {
     public Client(Object name) {
       this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), name)) {
         @Override
-        @SuppressFBWarnings("IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD")
-        synchronized void log() {
-          LOG.debug(toString());
+        void log() {
+          if (LOG.isDebugEnabled()) {
+            logDebug();
+          }
+        }
+
+        synchronized void logDebug() {
+          LOG.debug(super.toString());
           for (REQUEST r : requests) {
             LOG.debug("  {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
                 : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
@@ -229,7 +261,7 @@ public interface SlidingWindow {
     public synchronized String toString() {
       return requests + ", nextSeqNum=" + nextSeqNum
           + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
-          + ", delayed=" + delayedRequests.keySet();
+          + ", delayed=" + delayedRequests;
     }
 
     /**
@@ -282,7 +314,7 @@ public interface SlidingWindow {
       }
 
       // delay other requests
-      CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests");
+      CollectionUtils.putNew(seqNum, delayedRequests::put, () -> requests.getName() + ":delayedRequests");
       return false;
     }
 
@@ -326,12 +358,8 @@ public interface SlidingWindow {
     private void trySendDelayed(Consumer<REQUEST> sendMethod) {
       if (firstReplied) {
         // after first received, all other requests can be submitted (out-of-order)
-        if (!delayedRequests.isEmpty()) {
-          for (Long seqNum : delayedRequests.keySet()) {
-            sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"));
-          }
-          delayedRequests.clear();
-        }
+        delayedRequests.getAllAndClear().forEach(
+            seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")));
       } else {
         // Otherwise, submit the first only if it is a delayed request
         final Iterator<REQUEST> i = requests.iterator();