You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "hangc0276 (via GitHub)" <gi...@apache.org> on 2023/03/26 04:35:35 UTC

[GitHub] [bookkeeper] hangc0276 opened a new pull request, #3886: [Improve][Client] Group writing for per bookie client

hangc0276 opened a new pull request, #3886:
URL: https://github.com/apache/bookkeeper/pull/3886

   ### Motivation
   When the BookKeeper client writes an entry to the BookKeeper server, it runs with the following steps:
   - Step 1: Initiate a PendingAddOp object.
   - Step 2: For each replica, select a bookie client channel according to the ledgerId
   - Step 3: Write the entry to the bookie client channel, and flush it.
   - Step 4: The entry was added to Netty's pending queue, and processed with the configured Netty pipeline, such as `bookieProtoEncoder`,  `lengthbasedframedecoder`, and `consolidation`
   - Step 5: Waiting for the written response
   
   If the bookie client writes small entries with high ops and the Netty's pending queue will be full and the Netty thread will be busy with processing entries and flushing them into the socket channel. The CPU will switch between the user mode and the kernel mode in high frequency.
   
   #3383 introduced Netty channel flushes consolidation to mitigate syscall overhead. But it can not reduce the overhead on the Netty threads. 
   
   We can tune it one `Step 3` to group the small entries into one ByteBuf and flush it into the Netty pending queue when conditions are met.
   
   ### Design
   When a new entry comes to the bookie client channel, we add it into one ByteBuf and check whether the ByteBuf exceeds the max threshold, the default is 1MB. 
   
   In order to avoid entry staying in the Bookie client channel ByteBuf for a long time causing high write latency, we schedule a timer task to flush the ByteBuf every 1 ms.
   
   ### Performance
   We test the write performance on my laptop with the following command.
   ```bash
   bin/benchmark writes -ensemble 1 -quorum 1 -ackQuorum 1 -ledgers 100 -throttle 300000 -entrysize 60 -useV2 -warmupMessages 1000000
   ```
   The performance result.
   |  Writer ledgers | batched write ops/s | non-batched write ops/s | improved |
   | --- | --- | --- | --- | 
   | 1 | 333238 | 335970 | 0% |
   | 50 | 261605 | 153011 |  71% |
   | 100 | 260650 | 126331 |  100% |
   | 500 | 265628 | 164393 | 62%  |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] dlg99 commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "dlg99 (via GitHub)" <gi...@apache.org>.
dlg99 commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1152545199


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java:
##########
@@ -215,6 +215,18 @@ private static byte[] readMasterKey(ByteBuf packet) {
 
             return masterKey;
         }
+
+        public static void serializeAddRequests(Object request, ByteBuf buf) {
+            if (request instanceof ByteBuf) {
+                ByteBuf r = (ByteBuf) request;
+                buf.writeBytes(r);

Review Comment:
   what if buf does not have enough space to write bytes?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);

Review Comment:
   what if request's size > maxPendingRequestsSize ?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -173,6 +174,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                         BKException.Code.DuplicateEntryIdException,
                         BKException.Code.WriteOnReadOnlyBookieException));
     private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
+    private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024;
+
+    private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024;

Review Comment:
   this cannot be larger than nettyMaxFrameSizeBytes



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java:
##########
@@ -215,6 +215,18 @@ private static byte[] readMasterKey(ByteBuf packet) {
 
             return masterKey;
         }
+
+        public static void serializeAddRequests(Object request, ByteBuf buf) {
+            if (request instanceof ByteBuf) {
+                ByteBuf r = (ByteBuf) request;
+                buf.writeBytes(r);

Review Comment:
   do we really need to copy data to another ByteBuf or can use CompositeByteBuf?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   nextScheduledFlush is volatile so we assume access from different threads.
   It can be null in "if" and not null in the assignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1150070097


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {

Review Comment:
   Here we can refer the write cache and flush cache to make flushPendingRequests lock free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149553758


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {

Review Comment:
   The flushPendingRequests trigger at every millisecond. It may exacerbate the lock race between `prepareSendRequests` and `flushPendingRequests.` 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "zymap (via GitHub)" <gi...@apache.org>.
zymap commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149139881


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -349,6 +353,10 @@ enum ConnectionState {
     private final SecurityHandlerFactory shFactory;
     private volatile boolean isWritable = true;
     private long lastBookieUnavailableLogTimestamp = 0;
+    private ByteBuf pendingSendRequests = null;
+    private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
+    private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE;

Review Comment:
   Do we need to make it configurable?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   Do we need to make the delay time configurable?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {
+        final long startTime = MathUtils.nowInNano();
+        Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);

Review Comment:
   Do we need to check the pendingSendRequests is not null before pending the listener? To avoid unnecessary listeners registered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] dlg99 commented on pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "dlg99 (via GitHub)" <gi...@apache.org>.
dlg99 commented on PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#issuecomment-1489403928

   I think this should be configurable. 
   It improves throughput for some workloads, it is possible that more latency-sensitive workloads would want to disable this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149502642


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {
+        final long startTime = MathUtils.nowInNano();
+        Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);

Review Comment:
   +1. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1150070097


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {

Review Comment:
   Here we can refer to the implements of `writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage` to make flushPendingRequests lock-free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1150934885


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   > Do we need to make the delay time configurable?
   
   We should ensure the flush operation as soon as possible, or it may lead to the addEntry operation timeout. So if we make it configurable, the user may config it with a wrong value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149428342


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -349,6 +353,10 @@ enum ConnectionState {
     private final SecurityHandlerFactory shFactory;
     private volatile boolean isWritable = true;
     private long lastBookieUnavailableLogTimestamp = 0;
+    private ByteBuf pendingSendRequests = null;
+    private final Set<CompletionKey> pendingSendKeys = new HashSet<>();

Review Comment:
   Here we can also use ObjectHashSet



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);
         }
     }
 
+    public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
+        if (pendingSendRequests == null) {
+            pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
+        }
+        BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
+        pendingSendKeys.add(key);
+        return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
+    }
+
+    public synchronized void flushPendingRequests() {
+        final long startTime = MathUtils.nowInNano();
+        Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);

Review Comment:
   +1. 



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
+                        1, 1, TimeUnit.MILLISECONDS);
                 }
-            });
-            channel.writeAndFlush(request, promise);
+            } else {
+                final long startTime = MathUtils.nowInNano();
+
+                ChannelPromise promise = channel.newPromise().addListener(future -> {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                        CompletionValue completion = completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                    }
+                });
+                channel.writeAndFlush(request, promise);
+            }
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
             errorOut(key);

Review Comment:
   The catch code block is not suitable for the new logic. It just handle the single key case, and didn't handle the pending keys case.
   And in the try code block, it looks like there is no exception that will be thrown. I think we can remove the try catch block.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3886: [Improve][Client] Group writing into the channel in PerChannelBookieClient

Posted by "horizonzy (via GitHub)" <gi...@apache.org>.
horizonzy commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1149516903


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java:
##########
@@ -1154,26 +1162,80 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            final long startTime = MathUtils.nowInNano();
+            if (request instanceof ByteBuf || request instanceof ByteBufList) {
+                if (prepareSendRequests(channel, request, key)) {
+                    flushPendingRequests();
+                }
 
-            ChannelPromise promise = channel.newPromise().addListener(future -> {
-                if (future.isSuccess()) {
-                    nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
-                    }
-                } else {
-                    nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+                if (nextScheduledFlush == null) {
+                    nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,

Review Comment:
   We should cancel the `nextScheduledFlush` in the close method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Improve][Client] Group writing into the channel in PerChannelBookieClient [bookkeeper]

Posted by "eolivelli (via GitHub)" <gi...@apache.org>.
eolivelli commented on code in PR #3886:
URL: https://github.com/apache/bookkeeper/pull/3886#discussion_r1588877880


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java:
##########
@@ -215,6 +215,14 @@ private static byte[] readMasterKey(ByteBuf packet) {
 
             return masterKey;
         }
+
+        public static void serializeAddRequests(Object request, ByteBufList buf) {
+            if (request instanceof ByteBuf) {
+                buf.add((ByteBuf) request);
+            } else if (request instanceof ByteBufList) {
+                buf.add((ByteBufList) request);
+            }

Review Comment:
   else throw IllegalStateException ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org