You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "dlg99 (via GitHub)" <gi...@apache.org> on 2023/03/29 23:05:39 UTC

[GitHub] [bookkeeper] dlg99 commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

dlg99 commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1152545199


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java:
##########
@@ -215,6 +215,18 @@ private static byte[] readMasterKey(ByteBuf packet) {
 
             return masterKey;
         }
+
+        public static void serializeAddRequests(Object request, ByteBuf buf) {
+            if (request instanceof ByteBuf) {
+                ByteBuf r = (ByteBuf) request;
+                buf.writeBytes(r);

Review Comment:
   what if buf does not have enough space to write bytes?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);

Review Comment:
   what if request's size > maxPendingRequestsSize ?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -173,6 +174,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         BKException.Code.DuplicateEntryIdException,
                         BKException.Code.WriteOnReadOnlyBookieException));
     private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
+    private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024;
+
+    private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024;

Review Comment:
   this cannot be larger than nettyMaxFrameSizeBytes



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java:
##########
@@ -215,6 +215,18 @@ private static byte[] readMasterKey(ByteBuf packet) {
 
             return masterKey;
         }
+
+        public static void serializeAddRequests(Object request, ByteBuf buf) {
+            if (request instanceof ByteBuf) {
+                ByteBuf r = (ByteBuf) request;
+                buf.writeBytes(r);

Review Comment:
   do we really need to copy data to another ByteBuf or can use CompositeByteBuf?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   nextScheduledFlush is volatile so we assume access from different threads.
   It can be null in "if" and not null in the assignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org