You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "zymap (via GitHub)" <gi...@apache.org> on 2023/03/21 04:15:38 UTC

[GitHub] [bookkeeper] zymap commented on a diff in pull request #3843: Use BatchedArrayBlockingQueue in Journal

zymap commented on code in PR #3843:
URL: https://github.com/apache/bookkeeper/pull/3843#discussion_r1142873585


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -1197,7 +1177,13 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
-                qe = null;
+
+                if (localQueueEntriesIdx < localQueueEntriesLen) {
+                    qe = localQueueEntries[localQueueEntriesIdx++];
+                } else {
+                    Arrays.fill(localQueueEntries, 0, localQueueEntriesLen, null);

Review Comment:
   Same question with above. At least we can do it in the same loop when you take the queue from the array.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -512,16 +507,8 @@ public void run() {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }

Review Comment:
   Won't this cause the journal to discard one flush?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   At least we can do it in the line 491 loop to avoid introducing another loop



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -472,35 +472,30 @@ public void run() {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
+            final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = forceWriteRequests.takeAll(localRequests);
 
                     journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the last entry in the batch
-                    ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process();
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += req.process();
                         req.recycle();
                     }
-
+                    Arrays.fill(localRequests, 0, requestsCount, null);

Review Comment:
   Why do we need to fill the array with null? It will overwrite by the next, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org