You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "zymap (via GitHub)" <gi...@apache.org> on 2023/03/27 11:00:55 UTC

[GitHub] [bookkeeper] zymap commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

zymap commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149139881


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -349,6 +353,10 @@ enum ConnectionState {
     private final SecurityHandlerFactory shFactory;
     private volatile boolean isWritable = true;
     private long lastBookieUnavailableLogTimestamp = 0;
+    private ByteBuf pendingSendRequests = null;
+    private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
+    private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE;

Review Comment:
   Do we need to make it configurable?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   Do we need to make the delay time configurable?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {
+        final long startTime = MathUtils.nowInNano();
+        Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);

Review Comment:
   Do we need to check the pendingSendRequests is not null before pending the listener? To avoid unnecessary listeners registered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org