You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "zymap (via GitHub)" <gi...@apache.org> on 2023/03/10 09:54:08 UTC

[GitHub] [bookkeeper] zymap commented on a diff in pull request #3846: Streamline batch add request

zymap commented on code in PR #3846:
URL: https://github.com/apache/bookkeeper/pull/3846#discussion_r1132115816


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java:
##########
@@ -47,4 +49,11 @@ public interface RequestProcessor extends AutoCloseable {
      * Flush any pending response staged on all the client connections.
      */
     void flushPendingResponses();
+
+    /**
+     * Process a list of ParsedAddRequests.
+     * @param r
+     * @param channel
+     */
+    void processAddRequest(List<BookieProtocol.ParsedAddRequest> r, BookieRequestHandler channel);

Review Comment:
   Looks like `void processRequest(Object r, BookieRequestHandler channel);` is the same as the new one. Because the argument is an object, we can reuse that method



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1093,114 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
         }
     }
 
+    public void addEntryList(List<ParsedAddRequest> requests, boolean ackBeforeSync,
+                         WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException {
+        long requestNans = MathUtils.nowInNano();
+        boolean hasFailedRequests = false;
+        Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+        ListIterator<ParsedAddRequest> iter = requests.listIterator();
+        while (iter.hasNext()) {
+            ParsedAddRequest request = iter.next();
+            int rc = BookieProtocol.EOK;
+            try {
+                Pair<Long, byte[]> ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey());
+                LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+                if (handle == null) {
+                    handle = getLedgerForEntry(request.getData(), request.getMasterKey());
+                    handleMap.put(ledgerIdMasterKey, handle);
+                }
+
+                synchronized (handle) {
+                    if (handle.isFenced()) {
+                        throw BookieException.create(BookieException.Code.LedgerFencedException);
+                    }
+
+                    addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync,
+                        cb, ctx, request.getMasterKey());
+                }
+            } catch (BookieException.OperationRejectedException e) {
+                requestStats.getAddEntryRejectedCounter().inc();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation rejected while writing {} ", request, e);
+                }
+                rc = BookieProtocol.ETOOMANYREQUESTS;
+            } catch (IOException e) {
+                LOG.error("Error writing {}", request, e);
+                rc = BookieProtocol.EIO;
+            } catch (BookieException.LedgerFencedException lfe) {
+                LOG.error("Attempt to write to fenced ledger ", lfe);
+                rc = BookieProtocol.EFENCED;
+            } catch (BookieException e) {
+                LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e);
+                rc = BookieProtocol.EUA;
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while writing {}@{} : {} ",
+                    request.getLedgerId(), request.getEntryId(), t.getMessage(), t);
+                rc = BookieProtocol.EBADREQ;
+            }
+
+            if (rc != BookieProtocol.EOK) {
+                hasFailedRequests = true;
+                requestStats.getAddEntryStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS);
+                cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx);
+                iter.remove();
+                request.release();
+                request.recycle();
+            }
+        }
+        handleMap.clear();
+
+        if (hasFailedRequests && requestProcessor != null) {
+            requestProcessor.flushPendingResponses();
+        }
+
+        if (writeDataToJournal && !requests.isEmpty()) {
+            List<ByteBuf> entries = requests.stream()
+                .map(ParsedAddRequest::getData).collect(Collectors.toList());
+            getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx);
+
+            requests.forEach(t -> {
+                t.release();
+                t.recycle();
+            });
+        }
+    }
+
+    private void addEntryInternalWithoutJournal(LedgerDescriptor handle, ByteBuf entry,

Review Comment:
   We can easily add a flag at the original logic to avoid unnecessary duplicated code.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -885,6 +888,25 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
         logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
     }
 
+    public void logAddEntry(List<ByteBuf> entries, boolean ackBeforeSync, WriteCallback cb, Object ctx)
+        throws InterruptedException {
+        AtomicLong reserveMemory = new AtomicLong();
+        QueueEntry[] queueEntries = new QueueEntry[entries.size()];
+        for (int i = 0; i < entries.size(); ++i) {
+            ByteBuf entry = entries.get(i);
+            long ledgerId = entry.getLong(entry.readerIndex());
+            long entryId = entry.getLong(entry.readerIndex() + 8);
+            entry.retain();
+            reserveMemory.addAndGet(entry.readableBytes());
+            queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx,
+                MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime);
+        }
+
+        memoryLimitController.releaseMemory(reserveMemory.get());

Review Comment:
   reserveMemory?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java:
##########
@@ -229,6 +230,16 @@ protected void onAddRequestFinish() {
         }
     }
 
+    protected void onAddRequestFinishWithoutUnTrack() {
+        if (addsSemaphore != null) {
+            addsSemaphore.release();
+        }
+    }
+
+    protected void onAddRequestUnTrack() {
+        requestStats.untrackAddRequest();
+    }

Review Comment:
   You need to track N add requests at onAddRequestStart, and untrack it when onAddRequestFinish.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java:
##########
@@ -87,7 +97,32 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
             ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(msg, this);
+
+        if (msg instanceof BookieProtocol.ParsedAddRequest
+            && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode()
+            && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority()
+            && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION
+            && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) {
+            msgs.put((BookieProtocol.ParsedAddRequest) msg);

Review Comment:
   Put is a blocking operation. We should check the size first and then put the message into the queue.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java:
##########
@@ -87,7 +97,32 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
             ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(msg, this);
+
+        if (msg instanceof BookieProtocol.ParsedAddRequest
+            && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode()
+            && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority()
+            && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION
+            && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) {
+            msgs.put((BookieProtocol.ParsedAddRequest) msg);
+            if (msgs.size() >= maxCapacity) {
+                int count = msgs.size();
+                List<BookieProtocol.ParsedAddRequest> c = new ArrayList<>(count);
+                msgs.drainTo(c, count);
+                requestProcessor.processAddRequest(c, this);
+            }

Review Comment:
   ```suggestion
               if (!msgs.offer(msg)) {
                   ctx.fireChannelReadComplete();
               }
               msgs.put(msg); 
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java:
##########
@@ -729,4 +740,28 @@ public boolean isBlacklisted(Channel channel) {
     public void handleNonWritableChannel(Channel channel) {
         onResponseTimeout.accept(channel);
     }
+
+    @Override
+    public void processAddRequest(List<BookieProtocol.ParsedAddRequest> msgs, BookieRequestHandler requestHandler) {
+        WriteBatchEntryProcessor write = WriteBatchEntryProcessor.create(msgs, requestHandler, this);

Review Comment:
   You can use[ the original method](https://github.com/apache/bookkeeper/pull/3846/files#diff-380ca68ed5ce21fb226c59b8f9c9bd7c7be70da37091ea2c55db89cc4125e578R318) and when the Object msg is `List<BookieProtocol.ParsedAddRequest>` then call here. They do the same thing just in different ways, so let's reuse the existing method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org