You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "vldpyatkov (via GitHub)" <gi...@apache.org> on 2023/05/08 14:23:29 UTC

[GitHub] [ignite-3] vldpyatkov commented on a diff in pull request #2029: IGNITE-18859 PartitionReplicaListener refactoring

vldpyatkov commented on code in PR #2029:
URL: https://github.com/apache/ignite-3/pull/2029#discussion_r1187499121


##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -102,6 +109,17 @@ public void update(T newValue) {
         }
     }
 
+    /**
+     * Updates the internal state, if it is lower than {@code newValue} and completes all futures waiting for {@code newValue}
+     * that had been created for corresponding values that are lower than the given one.
+     *
+     * @param newValue New value.
+     * @throws TrackerClosedException if the tracker is closed.
+     */
+    public void update(T newValue) {

Review Comment:
   This method is looking risky, because you don't know exactly the type of value.



##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -161,8 +171,38 @@ public void close() {
 
         TrackerClosedException trackerClosedException = new TrackerClosedException();
 
-        valueFutures.values().forEach(future -> future.completeExceptionally(trackerClosedException));
+        completeWaitersOnClose(trackerClosedException);
 
         valueFutures.clear();
     }
+
+    protected void completeWaitersOnUpdate(T newValue, @Nullable R futureResult) {
+        ConcurrentNavigableMap<T, CompletableFuture<R>> smallerFutures = valueFutures.headMap(newValue, true);
+
+        smallerFutures.forEach((k, f) -> f.complete(futureResult));
+
+        smallerFutures.clear();

Review Comment:
   Why does it require to clear?



##########
modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java:
##########
@@ -22,34 +22,39 @@
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.util.Comparator;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Tracker that stores comparable value internally, this value can grow when {@link #update(Comparable)} method is called. The tracker gives
  * ability to wait for certain value, see {@link #waitFor(Comparable)}.
  */
-public class PendingComparableValuesTracker<T extends Comparable<T>> implements ManuallyCloseable {
+public class PendingComparableValuesTracker<T extends Comparable<T>, R> implements ManuallyCloseable {
     private static final VarHandle CURRENT;
 
     private static final VarHandle CLOSE_GUARD;
 
     static {
         try {
-            CURRENT = MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, "current", Comparable.class);
+            CURRENT = MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, "current", Map.Entry.class);
             CLOSE_GUARD = MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, "closeGuard", boolean.class);
         } catch (ReflectiveOperationException e) {
             throw new ExceptionInInitializerError(e);
         }
     }
 
     /** Map of comparable values to corresponding futures. */
-    private final ConcurrentSkipListMap<T, CompletableFuture<Void>> valueFutures = new ConcurrentSkipListMap<>();
+    private final ConcurrentSkipListMap<T, CompletableFuture<R>> valueFutures = new ConcurrentSkipListMap<>();
 
-    /** Current value. */
-    private volatile T current;
+    /** Current value along with associated result. */
+    @SuppressWarnings("FieldMayBeFinal") // Changed through CURRENT VarHandle.
+    private volatile Map.Entry<T, @Nullable R> current;

Review Comment:
   As I understand, the current value is the only is used. Because if you try to get future for the previous T, you will receive the value of this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org