You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "szetszwo (via GitHub)" <gi...@apache.org> on 2024/01/01 21:30:59 UTC

[PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

szetszwo opened a new pull request, #5900:
URL: https://github.com/apache/ozone/pull/5900

   ## What changes were proposed in this pull request?
   
   In `FlushNotifier`, it add a new latch to the `flushLatches` set for each `await()` call.
   
   We may reduce the number of objects by using `CompletableFuture`. Then, the `await()` calls waiting for the same flush will use for the same future.
   
   ## What is the link to the Apache JIRA
   
   HDDS-10039
   
   ## How was this patch tested?
   
   By updating existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "smengcl (via GitHub)" <gi...@apache.org>.
smengcl commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446337159


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;
+
+    synchronized CompletableFuture<Integer> await() {
+      awaitCount++;
+      final int flush = flushCount + 2;
+      LOG.debug("await flush {}", flush);
+      final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry());
+      Preconditions.checkState(flushFutures.size() <= 2);
+      return entry.await();
+    }
+
+    synchronized int notifyFlush() {
+      final int await = awaitCount;
+      final int flush = ++flushCount;
+      awaitCount -= Optional.ofNullable(flushFutures.remove(flush))
+          .map(Entry::complete)
+          .orElse(0);

Review Comment:
   Use of `Optional` does make the expression elegant here. But do note `Optional.ofNullable()` creates new object inside whenever `value` is not null every time:
   
   ```java
   public static <T> Optional<T> of(T value) {
       return new Optional<>(value);
   }
   ```
   
   Do we want to avoid the object creation here as well?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();

Review Comment:
   Shall we use a `ConcurrentHashMap` here instead?
   
   `TreeMap` doesn't seem to guarantee thread-safety for `remove()` or `computeIfAbsent()`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));

Review Comment:
   Just curious, in which case would `future.complete(count)` ever return `false` here?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;

Review Comment:
   We might need `volatile` keyword for both? Otherwise `++flushCount` in one thread may not be visible to others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446398548


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();

Review Comment:
   After this change, all methods are `synchronized`.  Since the methods are simple, `synchronized` is better than `ConcurrentHashMap`/`volatile`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446396007


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));

Review Comment:
   When the future is already in completed (normally or exceptionally), it will return false.  It should never happen here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "GeorgeJahad (via GitHub)" <gi...@apache.org>.
GeorgeJahad commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1879147834

   >  smengcl requested a review from GeorgeJahad
   
   @smengcl I'm on vacation and won't have time to look at this till I get back, (in 2 weeks.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "smengcl (via GitHub)" <gi...@apache.org>.
smengcl commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446991823


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));

Review Comment:
   Thanks @szetszwo !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1884418505

   Thanks @szetszwo for the patch, @smengcl for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1885216848

   @smengcl , thanks for reviewing this!
   
   @adoroszlai , thanks for merging this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446400577


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;

Review Comment:
   All methods are `synchronized`; see the other comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "smengcl (via GitHub)" <gi...@apache.org>.
smengcl commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446990969


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();

Review Comment:
   Good point. That should do it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446400827


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;
+
+    synchronized CompletableFuture<Integer> await() {
+      awaitCount++;
+      final int flush = flushCount + 2;
+      LOG.debug("await flush {}", flush);
+      final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry());
+      Preconditions.checkState(flushFutures.size() <= 2);
+      return entry.await();
+    }
+
+    synchronized int notifyFlush() {
+      final int await = awaitCount;
+      final int flush = ++flushCount;
+      awaitCount -= Optional.ofNullable(flushFutures.remove(flush))
+          .map(Entry::complete)
+          .orElse(0);

Review Comment:
   Sure, let me update this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1881694100

   @smengcl , could you review this then?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "smengcl (via GitHub)" <gi...@apache.org>.
smengcl commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1883414958

   > > smengcl requested a review from GeorgeJahad
   > 
   > @smengcl I'm on vacation and won't have time to look at this till I get back, (in 2 weeks.)
   
   Ah, got it. Enjoy you vacation!
   
   > @smengcl , could you review this then?
   
   Yup, on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "szetszwo (via GitHub)" <gi...@apache.org>.
szetszwo commented on PR #5900:
URL: https://github.com/apache/ozone/pull/5900#issuecomment-1883498749

   @smengcl , thanks a lot for reviewing this!  Just have pushed a commit addressing your comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-10039. Remove the flushLatches set from FlushNotifier. [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai merged PR #5900:
URL: https://github.com/apache/ozone/pull/5900


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org