You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "eolivelli (via GitHub)" <gi...@apache.org> on 2023/03/15 07:59:47 UTC

[GitHub] [bookkeeper] eolivelli commented on a diff in pull request #3846: Streamline batch add request

eolivelli commented on code in PR #3846:
URL: https://github.com/apache/bookkeeper/pull/3846#discussion_r1136657665


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1096,80 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
         }
     }
 
+    public void addEntryList(List<ParsedAddRequest> requests, boolean ackBeforeSync,
+                         WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException {
+        long requestNans = MathUtils.nowInNano();
+        boolean hasFailedRequests = false;
+        Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+        ListIterator<ParsedAddRequest> iter = requests.listIterator();
+        while (iter.hasNext()) {
+            ParsedAddRequest request = iter.next();
+            int rc = BookieProtocol.EOK;
+            try {
+                Pair<Long, byte[]> ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey());
+                LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+                if (handle == null) {
+                    handle = getLedgerForEntry(request.getData(), request.getMasterKey());
+                    handleMap.put(ledgerIdMasterKey, handle);
+                }
+
+                synchronized (handle) {
+                    if (handle.isFenced()) {
+                        throw BookieException.create(BookieException.Code.LedgerFencedException);
+                    }
+
+                    addEntryInternal(handle, request.getData(), ackBeforeSync,
+                        cb, ctx, request.getMasterKey(), false);
+                }
+            } catch (BookieException.OperationRejectedException e) {
+                requestStats.getAddEntryRejectedCounter().inc();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation rejected while writing {} ", request, e);
+                }
+                rc = BookieProtocol.ETOOMANYREQUESTS;
+            } catch (IOException e) {
+                LOG.error("Error writing {}", request, e);
+                rc = BookieProtocol.EIO;
+            } catch (BookieException.LedgerFencedException lfe) {
+                LOG.error("Attempt to write to fenced ledger ", lfe);
+                rc = BookieProtocol.EFENCED;
+            } catch (BookieException e) {
+                LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e);
+                rc = BookieProtocol.EUA;
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while writing {}@{} : {} ",
+                    request.getLedgerId(), request.getEntryId(), t.getMessage(), t);
+                rc = BookieProtocol.EBADREQ;
+            }
+
+            if (rc != BookieProtocol.EOK) {
+                hasFailedRequests = true;
+                requestStats.getAddEntryStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS);
+                cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx);
+                iter.remove();
+                request.release();
+                request.recycle();
+            }
+        }
+        handleMap.clear();
+
+        if (hasFailedRequests && requestProcessor != null) {
+            requestProcessor.flushPendingResponses();
+        }
+
+        if (writeDataToJournal && !requests.isEmpty()) {
+            List<ByteBuf> entries = requests.stream()
+                .map(ParsedAddRequest::getData).collect(Collectors.toList());
+            getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx);
+        }
+
+        requests.forEach(t -> {

Review Comment:
   put this into a "finally" block ?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -885,6 +887,26 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
         logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
     }
 
+    public void logAddEntry(List<ByteBuf> entries, boolean ackBeforeSync, WriteCallback cb, Object ctx)

Review Comment:
   nit: logAddEntries



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1096,80 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
         }
     }
 
+    public void addEntryList(List<ParsedAddRequest> requests, boolean ackBeforeSync,
+                         WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException {
+        long requestNans = MathUtils.nowInNano();
+        boolean hasFailedRequests = false;
+        Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+        ListIterator<ParsedAddRequest> iter = requests.listIterator();
+        while (iter.hasNext()) {
+            ParsedAddRequest request = iter.next();
+            int rc = BookieProtocol.EOK;
+            try {
+                Pair<Long, byte[]> ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey());
+                LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+                if (handle == null) {
+                    handle = getLedgerForEntry(request.getData(), request.getMasterKey());
+                    handleMap.put(ledgerIdMasterKey, handle);
+                }
+
+                synchronized (handle) {
+                    if (handle.isFenced()) {
+                        throw BookieException.create(BookieException.Code.LedgerFencedException);
+                    }
+
+                    addEntryInternal(handle, request.getData(), ackBeforeSync,
+                        cb, ctx, request.getMasterKey(), false);
+                }
+            } catch (BookieException.OperationRejectedException e) {
+                requestStats.getAddEntryRejectedCounter().inc();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation rejected while writing {} ", request, e);
+                }
+                rc = BookieProtocol.ETOOMANYREQUESTS;
+            } catch (IOException e) {
+                LOG.error("Error writing {}", request, e);
+                rc = BookieProtocol.EIO;
+            } catch (BookieException.LedgerFencedException lfe) {
+                LOG.error("Attempt to write to fenced ledger ", lfe);
+                rc = BookieProtocol.EFENCED;
+            } catch (BookieException e) {
+                LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e);
+                rc = BookieProtocol.EUA;
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while writing {}@{} : {} ",
+                    request.getLedgerId(), request.getEntryId(), t.getMessage(), t);
+                rc = BookieProtocol.EBADREQ;
+            }
+
+            if (rc != BookieProtocol.EOK) {
+                hasFailedRequests = true;
+                requestStats.getAddEntryStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS);
+                cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx);
+                iter.remove();
+                request.release();
+                request.recycle();
+            }
+        }
+        handleMap.clear();
+
+        if (hasFailedRequests && requestProcessor != null) {
+            requestProcessor.flushPendingResponses();
+        }
+
+        if (writeDataToJournal && !requests.isEmpty()) {
+            List<ByteBuf> entries = requests.stream()
+                .map(ParsedAddRequest::getData).collect(Collectors.toList());
+            getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx);

Review Comment:
   here we are assuming that all the entries are for the same ledger.
   Can we add some assertions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org