You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "merlimat (via GitHub)" <gi...@apache.org> on 2023/03/07 04:13:36 UTC

[GitHub] [bookkeeper] merlimat opened a new pull request, #3843: Use BatchedArrayBlockingQueue in Journal

merlimat opened a new pull request, #3843:
URL: https://github.com/apache/bookkeeper/pull/3843

   ### Motivation
   
   Introduced `BatchedArrayBlockingQueue` for Journal and ForceWriteThread. 
   
   Refactored the code to take advantage of `takeAll()` and `pollAll()` methods, this simplifies the logic of doing `drainTo()`, `take()` and `poll()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1143212297


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Maybe not, only the `forceWriteRequests.takeAll(localRequests)` throw the InterruptedException. If it throw InterruptedException, we didn't have data to flush.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142882891


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Here we catch InterruptedException, the InterruptedException is thrown by `forceWriteRequests.takeAll(localRequests)`.
   So if it throws InterruptedException, that means we didn't read the data from the queue. So we don't need to handle the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142882891


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Here we catch InterruptedException, the InterruptedException is thrown by `forceWriteRequests.takeAll(localRequests)`.
   So if it InterruptedException, that means we didn't read the data from the queue. So we don't need to handle the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] merlimat commented on pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#issuecomment-1476533985

   @horizonzy Yes, please push the fix, thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "zymap (via GitHub)" <gi...@apache.org>.
zymap commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1143053408


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   But it also interrupted the force write thread. We need to ensure the journal data is flushed into the disk, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#issuecomment-1476529340

   @merlimat I fixed the failed test in https://github.com/horizonzy/bookkeeper/pull/2, Please help take a look. If you have no objections, I will push the fix to your branch, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142882891


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Here we catch InterruptedException, the InterruptedException is thrown by `forceWriteRequests.takeAll(localRequests)`.
   So if it throws InterruptedException, that means we didn't read the data from the queue. So we don't need to handle it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142985585


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   @merlimat, cc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] merlimat merged pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat merged PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#issuecomment-1476546151

   https://github.com/merlimat/bookkeeper/pull/2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142881335


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   > Why do we need to fill the array with null? It will overwrite by the next, no?
   
   At the before time, we read the 100 elements from the queue. the localRequests array elements will be filled from 0-100.
   But the next time, if the queue only has 50 elements, the the localRequests array 0-50 will be overridden, the 51-100 elements still be the before-time elements.
   
   The reference is strong, it's bad for GC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] merlimat commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142884988


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   > Why do we need to fill the array with null? It will overwrite by the next, no?
   
   While not strictly necessary, it is mostly to make sure the object can be GCed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142881561


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   > At least we can do it in the line 491 loop to avoid introducing another loop
   
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "zymap (via GitHub)" <gi...@apache.org>.
zymap commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142873585


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -1197,7 +1177,13 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
-                qe = null;
+
+                if (localQueueEntriesIdx < localQueueEntriesLen) {
+                    qe = localQueueEntries[localQueueEntriesIdx++];
+                } else {
+                    Arrays.fill(localQueueEntries, 0, localQueueEntriesLen, null);

Review Comment:
   Same question with above. At least we can do it in the same loop when you take the queue from the array.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Won't this cause the journal to discard one flush?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   At least we can do it in the line 491 loop to avoid introducing another loop



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   Why do we need to fill the array with null? It will overwrite by the next, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

Posted by "zymap (via GitHub)" <gi...@apache.org>.
zymap commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142948451


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org