You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/08/12 16:06:39 UTC

[kafka] branch trunk updated: KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50e5b32a6d5 KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
50e5b32a6d5 is described below

commit 50e5b32a6d5f7940e932c6acb20a452535c0bf60
Author: dengziming <de...@gmail.com>
AuthorDate: Sat Aug 13 00:06:24 2022 +0800

    KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
    
    The reason for KAFKA-13959 is a little complex, the two keys to this problem are:
    
    KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW.
    
    Here are the event sequences:
    
    1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
    2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m)
        2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
        2.2. leader send FetchResponse(HW=m)
        3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
    3. Leader append NoOpRecord, LEO=m+2. leader HW=m
    4. Looping 1-4
    
    If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.
    
    We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.
    
    Reviewers: Luke Chen <sh...@gmail.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../kafka/controller/BrokerHeartbeatManager.java   | 26 ++++++++---------
 .../kafka/controller/ClusterControlManager.java    | 25 ++++++++++++++--
 .../apache/kafka/controller/QuorumController.java  | 27 +++++++++++++-----
 .../controller/ReplicationControlManager.java      |  6 ++--
 .../controller/ClusterControlManagerTest.java      | 19 +++++++------
 .../controller/ProducerIdControlManagerTest.java   |  2 +-
 .../org/apache/kafka/metadata/RecordTestUtils.java | 33 +++++++++++++---------
 .../kafka/raft/internals/FuturePurgatory.java      |  4 +--
 9 files changed, 92 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4e253047ee6..860056f9a3e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -81,7 +81,7 @@ object Defaults {
   val BrokerHeartbeatIntervalMs = 2000
   val BrokerSessionTimeoutMs = 9000
   val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
-  val MetadataMaxIdleIntervalMs = 5000
+  val MetadataMaxIdleIntervalMs = 500
 
   /** KRaft mode configs */
   val EmptyNodeId: Int = -1
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index f31df917d76..428f1c5833e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -511,17 +511,17 @@ public class BrokerHeartbeatManager {
     /**
      * Calculate the next broker state for a broker that just sent a heartbeat request.
      *
-     * @param brokerId              The broker id.
-     * @param request               The incoming heartbeat request.
-     * @param lastCommittedOffset   The last committed offset of the quorum controller.
-     * @param hasLeaderships        A callback which evaluates to true if the broker leads
-     *                              at least one partition.
+     * @param brokerId                     The broker id.
+     * @param request                      The incoming heartbeat request.
+     * @param registerBrokerRecordOffset   The offset of the broker's {@link org.apache.kafka.common.metadata.RegisterBrokerRecord}.
+     * @param hasLeaderships               A callback which evaluates to true if the broker leads
+     *                                     at least one partition.
      *
-     * @return                      The current and next broker states.
+     * @return                             The current and next broker states.
      */
     BrokerControlStates calculateNextBrokerState(int brokerId,
                                                  BrokerHeartbeatRequestData request,
-                                                 long lastCommittedOffset,
+                                                 long registerBrokerRecordOffset,
                                                  Supplier<Boolean> hasLeaderships) {
         BrokerHeartbeatState broker = brokers.getOrDefault(brokerId,
             new BrokerHeartbeatState(brokerId));
@@ -533,17 +533,17 @@ public class BrokerHeartbeatManager {
                         "shutdown.", brokerId);
                     return new BrokerControlStates(currentState, SHUTDOWN_NOW);
                 } else if (!request.wantFence()) {
-                    if (request.currentMetadataOffset() >= lastCommittedOffset) {
+                    if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
                         log.info("The request from broker {} to unfence has been granted " +
-                                "because it has caught up with the last committed metadata " +
-                                "offset {}.", brokerId, lastCommittedOffset);
+                                "because it has caught up with the offset of it's register " +
+                                "broker record {}.", brokerId, registerBrokerRecordOffset);
                         return new BrokerControlStates(currentState, UNFENCED);
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("The request from broker {} to unfence cannot yet " +
-                                "be granted because it has not caught up with the last " +
-                                "committed metadata offset {}. It is still at offset {}.",
-                                brokerId, lastCommittedOffset, request.currentMetadataOffset());
+                                "be granted because it has not caught up with the offset of " +
+                                "it's register broker record {}. It is still at offset {}.",
+                                brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
                         }
                         return new BrokerControlStates(currentState, FENCED);
                     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 235f077cfff..d30f4324217 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -217,6 +218,14 @@ public class ClusterControlManager {
      */
     private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
 
+    /**
+     * Save the offset of each broker registration record, we will only unfence a
+     * broker when its high watermark has reached its broker registration record,
+     * this is not necessarily the exact offset of each broker registration record
+     * but should not be smaller than it.
+     */
+    private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
+
     /**
      * A reference to the controller's metrics registry.
      */
@@ -255,6 +264,7 @@ public class ClusterControlManager {
         this.sessionTimeoutNs = sessionTimeoutNs;
         this.replicaPlacer = replicaPlacer;
         this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
         this.heartbeatManager = null;
         this.readyBrokersFuture = Optional.empty();
         this.controllerMetrics = metrics;
@@ -366,7 +376,15 @@ public class ClusterControlManager {
         return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
-    public void replay(RegisterBrokerRecord record) {
+    public OptionalLong registerBrokerRecordOffset(int brokerId) {
+        if (registerBrokerRecordOffsets.containsKey(brokerId)) {
+            return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+        }
+        return OptionalLong.empty();
+    }
+
+    public void replay(RegisterBrokerRecord record, long offset) {
+        registerBrokerRecordOffsets.put(record.brokerId(), offset);
         int brokerId = record.brokerId();
         List<Endpoint> listeners = new ArrayList<>();
         for (BrokerEndpoint endpoint : record.endPoints()) {
@@ -401,14 +419,15 @@ public class ClusterControlManager {
     }
 
     public void replay(UnregisterBrokerRecord record) {
+        registerBrokerRecordOffsets.remove(record.brokerId());
         int brokerId = record.brokerId();
         BrokerRegistration registration = brokerRegistrations.get(brokerId);
         if (registration == null) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration found for that id", record.toString()));
+                "registration found for that id", record));
         } else if (registration.epoch() != record.brokerEpoch()) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration with that epoch found", record.toString()));
+                "registration with that epoch found", record));
         } else {
             if (heartbeatManager != null) heartbeatManager.remove(brokerId);
             brokerRegistrations.remove(brokerId);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ef87248f134..3fee25841ba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
@@ -759,7 +760,7 @@ public final class QuorumController implements Controller {
                 int i = 1;
                 for (ApiMessageAndVersion message : result.records()) {
                     try {
-                        replay(message.message(), Optional.empty());
+                        replay(message.message(), Optional.empty(), writeOffset + result.records().size());
                     } catch (Throwable e) {
                         String failureMessage = String.format("Unable to apply %s record, which was " +
                             "%d of %d record(s) in the batch following last writeOffset %d.",
@@ -883,7 +884,7 @@ public final class QuorumController implements Controller {
                             int i = 1;
                             for (ApiMessageAndVersion message : messages) {
                                 try {
-                                    replay(message.message(), Optional.empty());
+                                    replay(message.message(), Optional.empty(), offset);
                                 } catch (Throwable e) {
                                     String failureMessage = String.format("Unable to apply %s record on standby " +
                                             "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
@@ -938,7 +939,7 @@ public final class QuorumController implements Controller {
                         int i = 1;
                         for (ApiMessageAndVersion message : messages) {
                             try {
-                                replay(message.message(), Optional.of(reader.snapshotId()));
+                                replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
                             } catch (Throwable e) {
                                 String failureMessage = String.format("Unable to apply %s record " +
                                         "from snapshot %s on standby controller, which was %d of " +
@@ -1305,12 +1306,19 @@ public final class QuorumController implements Controller {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+    /**
+     * Apply the metadata record to its corresponding in-memory state(s)
+     *
+     * @param message           The metadata record
+     * @param snapshotId        The snapshotId if this record is from a snapshot
+     * @param batchLastOffset   The offset of the last record in the log batch, or the lastContainedLogOffset
+     *                          if this record is from a snapshot, this is used along with RegisterBrokerRecord
+     */
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
-                clusterControl.replay((RegisterBrokerRecord) message);
+                clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
                 break;
             case UNREGISTER_BROKER_RECORD:
                 clusterControl.replay((UnregisterBrokerRecord) message);
@@ -1874,8 +1882,13 @@ public final class QuorumController implements Controller {
 
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
+                    OptionalLong offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId);
+                    if (!offsetForRegisterBrokerRecord.isPresent()) {
+                        throw new StaleBrokerEpochException(
+                            String.format("Receive a heartbeat from broker %d before registration", brokerId));
+                    }
                     ControllerResult<BrokerHeartbeatReply> result = replicationControl.
-                        processBrokerHeartbeat(request, lastCommittedOffset);
+                        processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
                     inControlledShutdown = result.response().inControlledShutdown();
                     rescheduleMaybeFenceStaleBrokers();
                     return result;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index bf3a679d2ce..4ffb339967c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1355,13 +1355,13 @@ public class ReplicationControlManager {
     }
 
     ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
-                BrokerHeartbeatRequestData request, long lastCommittedOffset) {
+                BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
         int brokerId = request.brokerId();
         long brokerEpoch = request.brokerEpoch();
         clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
         BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
         BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
-            request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
+            request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
         List<ApiMessageAndVersion> records = new ArrayList<>();
         if (states.current() != states.next()) {
             switch (states.next()) {
@@ -1382,7 +1382,7 @@ public class ReplicationControlManager {
         heartbeatManager.touch(brokerId,
             states.next().fenced(),
             request.currentMetadataOffset());
-        boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset;
+        boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
         BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
                 states.next().fenced(),
                 states.next().inControlledShutdown(),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 97d6c883772..e47def81e6d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -96,7 +96,7 @@ public class ClusterControlManagerTest {
             setPort((short) 9092).
             setName("PLAINTEXT").
             setHost("example.com"));
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
         clusterControl.checkBrokerEpoch(1, 100);
         assertThrows(StaleBrokerEpochException.class,
             () -> clusterControl.checkBrokerEpoch(1, 101));
@@ -165,19 +165,20 @@ public class ClusterControlManagerTest {
             setPort((short) 9092).
             setName("PLAINTEXT").
             setHost("example.com"));
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
 
         assertFalse(clusterControl.unfenced(0));
         assertTrue(clusterControl.inControlledShutdown(0));
 
         brokerRecord.setInControlledShutdown(false);
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
 
         assertFalse(clusterControl.unfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
+        assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
 
         brokerRecord.setFenced(false);
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
 
         assertTrue(clusterControl.unfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
@@ -217,7 +218,7 @@ public class ClusterControlManagerTest {
             setPort((short) 9092).
             setName("PLAINTEXT").
             setHost("example.com"));
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
 
         assertTrue(clusterControl.unfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
@@ -341,17 +342,19 @@ public class ClusterControlManagerTest {
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
-        clusterControl.replay(brokerRecord);
+        clusterControl.replay(brokerRecord, 100L);
         assertEquals(new BrokerRegistration(1, 100,
                 Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT",
                 new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)),
                 Collections.emptyMap(), Optional.of("arack"), true, false),
             clusterControl.brokerRegistrations().get(1));
+        assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
         UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
             setBrokerId(1).
             setBrokerEpoch(100);
         clusterControl.replay(unregisterRecord);
         assertFalse(clusterControl.brokerRegistrations().containsKey(1));
+        assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent());
     }
 
     @ParameterizedTest
@@ -382,7 +385,7 @@ public class ClusterControlManagerTest {
                 setPort((short) 9092).
                 setName("PLAINTEXT").
                 setHost("example.com"));
-            clusterControl.replay(brokerRecord);
+            clusterControl.replay(brokerRecord, 100L);
             UnfenceBrokerRecord unfenceRecord =
                 new UnfenceBrokerRecord().setId(i).setEpoch(100);
             clusterControl.replay(unfenceRecord);
@@ -442,7 +445,7 @@ public class ClusterControlManagerTest {
                 setPort((short) 9092 + i).
                 setName("PLAINTEXT").
                 setHost("example.com"));
-            clusterControl.replay(brokerRecord);
+            clusterControl.replay(brokerRecord, 100L);
         }
         for (int i = 0; i < 2; i++) {
             UnfenceBrokerRecord unfenceBrokerRecord =
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index ccdd3a5b233..80c5c505ae0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -76,7 +76,7 @@ public class ProducerIdControlManagerTest {
                     setPort((short) 9092).
                     setName("PLAINTEXT").
                     setHost(String.format("broker-%02d.example.org", i)));
-            clusterControl.replay(brokerRecord);
+            clusterControl.replay(brokerRecord, 100L);
         }
 
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index f5a8da5f8a2..c21bdb54478 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -57,20 +57,25 @@ public class RecordTestUtils {
         for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
             ApiMessage record = recordAndVersion.message();
             try {
-                Method method = target.getClass().getMethod("replay", record.getClass());
-                method.invoke(target, record);
-            } catch (NoSuchMethodException e) {
                 try {
-                    Method method = target.getClass().getMethod("replay",
-                        record.getClass(),
-                        Optional.class);
-                    method.invoke(target, record, Optional.empty());
-                } catch (NoSuchMethodException t) {
-                    // ignore
-                } catch (InvocationTargetException t) {
-                    throw new RuntimeException(t);
-                } catch (IllegalAccessException t) {
-                    throw new RuntimeException(t);
+                    Method method = target.getClass().getMethod("replay", record.getClass());
+                    method.invoke(target, record);
+                } catch (NoSuchMethodException e) {
+                    try {
+                        Method method = target.getClass().getMethod("replay",
+                            record.getClass(),
+                            Optional.class);
+                        method.invoke(target, record, Optional.empty());
+                    } catch (NoSuchMethodException t) {
+                        try {
+                            Method method = target.getClass().getMethod("replay",
+                                record.getClass(),
+                                long.class);
+                            method.invoke(target, record, 0L);
+                        } catch (NoSuchMethodException i) {
+                            // ignore
+                        }
+                    }
                 }
             } catch (InvocationTargetException e) {
                 throw new RuntimeException(e);
@@ -119,7 +124,7 @@ public class RecordTestUtils {
      * @param delta the metadata delta on which to replay the records
      * @param highestOffset highest offset from the list of record batches
      * @param highestEpoch highest epoch from the list of record batches
-     * @param recordsAndVersions list of batches of records
+     * @param batches list of batches of records
      */
     public static void replayAllBatches(
         MetadataDelta delta,
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
index b37fb3a3847..e5dceeaa0c3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
@@ -56,8 +56,8 @@ public interface FuturePurgatory<T extends Comparable<T>> {
     CompletableFuture<Long> await(T threshold, long maxWaitTimeMs);
 
     /**
-     * Complete awaiting futures whose associated values are larger than the given threshold value.
-     * The completion callbacks will be triggered from the calling thread.
+     * Complete awaiting futures whose threshold value from {@link FuturePurgatory#await} are smaller
+     * than the given threshold value. The completion callbacks will be triggered from the calling thread.
      *
      * @param value         the threshold value used to determine which futures can be completed
      * @param currentTimeMs the current time in milliseconds that will be passed to