You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/10/15 04:08:26 UTC

[GitHub] [rocketmq] RongtongJin commented on a change in pull request #1516: [ISSUE #1515] SYNC_MASTER could be change into pipeline manner

RongtongJin commented on a change in pull request #1516: [ISSUE #1515] SYNC_MASTER could be change into pipeline manner 
URL: https://github.com/apache/rocketmq/pull/1516#discussion_r334741847
 
 

 ##########
 File path: store/src/main/java/org/apache/rocketmq/store/CommitLog.java
 ##########
 @@ -1041,26 +1327,39 @@ public long getJointime() {
     public static class GroupCommitRequest {
         private final long nextOffset;
         private final CountDownLatch countDownLatch = new CountDownLatch(1);
-        private volatile boolean flushOK = false;
+        private CompletableFuture<Boolean> flushOk = new CompletableFuture<>();
+        private final long startTimestamp = System.currentTimeMillis();
+        private long timeoutMillis = Long.MAX_VALUE;
+
+        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
+            this.nextOffset = nextOffset;
+            this.timeoutMillis = timeoutMillis;
+        }
 
         public GroupCommitRequest(long nextOffset) {
             this.nextOffset = nextOffset;
         }
 
+
         public long getNextOffset() {
             return nextOffset;
         }
 
         public void wakeupCustomer(final boolean flushOK) {
-            this.flushOK = flushOK;
+            long endTimestamp = System.currentTimeMillis();
+            this.flushOk.complete(flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis));
             this.countDownLatch.countDown();
         }
 
+        public CompletableFuture<Boolean> future() {
+            return flushOk;
+        }
+
         public boolean waitForFlush(long timeout) {
             try {
                 this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
-                return this.flushOK;
-            } catch (InterruptedException e) {
+                return flushOk.get();
+            } catch (InterruptedException | ExecutionException e) {
                 log.error("Interrupted", e);
                 return false;
             }
 
 Review comment:
   When we use CompletableFuture, countDownLatch and waitForFlush seem redundant.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services