You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "horizonzy (via GitHub)" <gi...@apache.org> on 2023/03/27 16:32:46 UTC

[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149428342


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -349,6 +353,10 @@ enum ConnectionState {
     private final SecurityHandlerFactory shFactory;
     private volatile boolean isWritable = true;
     private long lastBookieUnavailableLogTimestamp = 0;
+    private ByteBuf pendingSendRequests = null;
+    private final Set<CompletionKey> pendingSendKeys = new HashSet<>();

Review Comment:
   Here we can also use ObjectHashSet



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {
+        final long startTime = MathUtils.nowInNano();
+        Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);

Review Comment:
   +1. 



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);

Review Comment:
   The catch code block is not suitable for the new logic. It just handle the single key case, and didn't handle the pending keys case.
   And in the try code block, it looks like there is no exception that will be thrown. I think we can remove the try catch block.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org