You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ni...@apache.org on 2023/05/04 07:00:28 UTC

[bookkeeper] branch master updated: Use ChannelVoidPromise to avoid useless promise objects creation (#3733)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bbea67b095 Use ChannelVoidPromise to avoid useless promise objects creation (#3733)
bbea67b095 is described below

commit bbea67b09566002b9393a8b8f256988ea9755cc1
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu May 4 09:00:22 2023 +0200

    Use ChannelVoidPromise to avoid useless promise objects creation (#3733)
---
 .../org/apache/bookkeeper/proto/AuthHandler.java   | 42 +++++++------
 .../bookkeeper/proto/BookieRequestProcessor.java   | 40 +++++++-----
 .../bookkeeper/proto/PacketProcessorBase.java      |  4 +-
 .../apache/bookkeeper/util/NettyChannelUtil.java   | 62 ++++++++++++++++++
 .../bookkeeper/util/NettyChannelUtilTest.java      | 73 ++++++++++++++++++++++
 5 files changed, 188 insertions(+), 33 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index f923b61ad5..720e7279f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.NettyChannelUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,15 +103,15 @@ class AuthHandler {
             } else if (msg instanceof BookieProtocol.Request) {
                 BookieProtocol.Request req = (BookieProtocol.Request) msg;
                 if (req.getOpCode() == BookieProtocol.ADDENTRY) {
-                    ctx.channel().writeAndFlush(
-                            BookieProtocol.AddResponse.create(
-                                    req.getProtocolVersion(), BookieProtocol.EUA,
-                                    req.getLedgerId(), req.getEntryId()));
+                    final BookieProtocol.AddResponse response = BookieProtocol.AddResponse.create(
+                            req.getProtocolVersion(), BookieProtocol.EUA,
+                            req.getLedgerId(), req.getEntryId());
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
                 } else if (req.getOpCode() == BookieProtocol.READENTRY) {
-                    ctx.channel().writeAndFlush(
-                            new BookieProtocol.ReadResponse(
-                                    req.getProtocolVersion(), BookieProtocol.EUA,
-                                    req.getLedgerId(), req.getEntryId()));
+                    final BookieProtocol.ReadResponse response = new BookieProtocol.ReadResponse(
+                            req.getProtocolVersion(), BookieProtocol.EUA,
+                            req.getLedgerId(), req.getEntryId());
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
                 } else {
                     ctx.channel().close();
                 }
@@ -133,7 +134,7 @@ class AuthHandler {
                         .setHeader(req.getHeader())
                         .setStatus(BookkeeperProtocol.StatusCode.EUA);
 
-                    ctx.channel().writeAndFlush(builder.build());
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), builder.build());
                 }
             } else {
                 // close the channel, junk coming over it
@@ -172,7 +173,9 @@ class AuthHandler {
                 }
                 AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(req.authMessage.getAuthPluginName())
                         .setPayload(ByteString.copyFrom(newam.getData())).build();
-                channel.writeAndFlush(new BookieProtocol.AuthResponse(req.getProtocolVersion(), message));
+                final BookieProtocol.AuthResponse response =
+                        new BookieProtocol.AuthResponse(req.getProtocolVersion(), message);
+                NettyChannelUtil.writeAndFlushWithVoidPromise(channel, response);
             }
         }
 
@@ -196,14 +199,17 @@ class AuthHandler {
                     LOG.error("Error processing auth message, closing connection");
 
                     builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
-                    channel.writeAndFlush(builder.build());
-                    channel.close();
+                    NettyChannelUtil.writeAndFlushWithClosePromise(
+                            channel, builder.build()
+                    );
                     return;
                 } else {
                     AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
                             .setPayload(ByteString.copyFrom(newam.getData())).build();
                     builder.setStatus(BookkeeperProtocol.StatusCode.EOK).setAuthResponse(message);
-                    channel.writeAndFlush(builder.build());
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(
+                            channel, builder.build()
+                    );
                 }
             }
         }
@@ -399,9 +405,9 @@ class AuthHandler {
                         .setPayload(ByteString.copyFrom(newam.getData())).build();
 
                 if (isUsingV2Protocol) {
-                    channel.writeAndFlush(
-                            new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
-                            channel.voidPromise());
+                    final BookieProtocol.AuthRequest msg =
+                            new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(channel, msg);
                 } else {
                     // V3 protocol
                     BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
@@ -410,7 +416,7 @@ class AuthHandler {
                     BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
                             .setHeader(header)
                             .setAuthRequest(message);
-                    channel.writeAndFlush(builder.build());
+                    NettyChannelUtil.writeAndFlushWithVoidPromise(channel, builder.build());
                 }
             }
         }
@@ -429,7 +435,7 @@ class AuthHandler {
                         authenticated = true;
                         Object msg = waitingForAuth.poll();
                         while (msg != null) {
-                            ctx.writeAndFlush(msg);
+                            NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg);
                             msg = waitingForAuth.poll();
                         }
                     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 6e7f5abcbb..7ebd8c90a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.NettyChannelUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -330,11 +331,12 @@ public class BookieRequestProcessor implements RequestProcessor {
                                 .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
                                 .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
                                 .build();
-                        BookkeeperProtocol.Response.Builder authResponse = BookkeeperProtocol.Response
+                        final BookkeeperProtocol.Response authResponse = BookkeeperProtocol.Response
                                 .newBuilder().setHeader(r.getHeader())
                                 .setStatus(BookkeeperProtocol.StatusCode.EOK)
-                                .setAuthResponse(message);
-                        channel.writeAndFlush(authResponse.build());
+                                .setAuthResponse(message)
+                                .build();
+                        writeAndFlush(channel, authResponse);
                         break;
                     case WRITE_LAC:
                         processWriteLacRequestV3(r, requestHandler);
@@ -353,10 +355,11 @@ public class BookieRequestProcessor implements RequestProcessor {
                         break;
                     default:
                         LOG.info("Unknown operation type {}", header.getOperation());
-                        BookkeeperProtocol.Response.Builder response =
+                        final BookkeeperProtocol.Response response =
                                 BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
-                                        .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
-                        channel.writeAndFlush(response.build());
+                                        .setStatus(BookkeeperProtocol.StatusCode.EBADREQ)
+                                        .build();
+                        writeAndFlush(channel, response);
                         if (statsEnabled) {
                             bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                         }
@@ -386,12 +389,15 @@ public class BookieRequestProcessor implements RequestProcessor {
                             .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
                             .build();
 
-                    channel.writeAndFlush(new BookieProtocol.AuthResponse(
-                            BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
+                    final BookieProtocol.AuthResponse response = new BookieProtocol.AuthResponse(
+                            BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
+                    writeAndFlush(channel, response);
                     break;
                 default:
                     LOG.error("Unknown op type {}, sending error", r.getOpCode());
-                    channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+                    final BookieProtocol.Response errResponse = ResponseBuilder
+                            .buildErrorResponse(BookieProtocol.EBADREQ, r);
+                    writeAndFlush(channel, errResponse);
                     if (statsEnabled) {
                         bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                     }
@@ -568,7 +574,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (shFactory == null) {
             LOG.error("Got StartTLS request but TLS not configured");
             response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
-            c.writeAndFlush(response.build());
+            writeAndFlush(c, response.build());
         } else {
             // there is no need to execute in a different thread as this operation is light
             SslHandler sslHandler = shFactory.newTLSHandler();
@@ -597,16 +603,18 @@ public class BookieRequestProcessor implements RequestProcessor {
                         } else {
                             LOG.error("TLS Handshake failure: ", future.cause());
                         }
-                        BookkeeperProtocol.Response.Builder errResponse = BookkeeperProtocol.Response.newBuilder()
-                                .setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EIO);
-                        c.writeAndFlush(errResponse.build());
+                        final BookkeeperProtocol.Response errResponse = BookkeeperProtocol.Response.newBuilder()
+                                .setHeader(r.getHeader())
+                                .setStatus(BookkeeperProtocol.StatusCode.EIO)
+                                .build();
+                        writeAndFlush(c, errResponse);
                         if (statsEnabled) {
                             bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                         }
                     }
                 }
             });
-            c.writeAndFlush(response.build());
+            writeAndFlush(c, response.build());
         }
     }
 
@@ -722,4 +730,8 @@ public class BookieRequestProcessor implements RequestProcessor {
     public void handleNonWritableChannel(Channel channel) {
         onResponseTimeout.accept(channel);
     }
+
+    private static void writeAndFlush(Channel channel, Object msg) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(channel, msg);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index c9798156c2..889d3790d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -122,13 +122,15 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
         }
 
         if (channel.isActive()) {
-            ChannelPromise promise = channel.voidPromise();
+            final ChannelPromise promise;
             if (logger.isDebugEnabled()) {
                 promise = channel.newPromise().addListener(future -> {
                     if (!future.isSuccess()) {
                         logger.debug("Netty channel write exception. ", future.cause());
                     }
                 });
+            } else {
+                promise = channel.voidPromise();
             }
             channel.writeAndFlush(response, promise);
         } else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java
new file mode 100644
index 0000000000..7add100e74
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.VoidChannelPromise;
+
+/**
+ * Contains utility methods for working with Netty Channels.
+ */
+public final class NettyChannelUtil {
+
+    private NettyChannelUtil() {
+    }
+
+    /**
+     * Write and flush the message to the channel.
+     *
+     * The promise is an instance of {@link VoidChannelPromise} that properly propagates exceptions up to the pipeline.
+     * Netty has many ad-hoc optimization if the promise is an instance of {@link VoidChannelPromise}.
+     * Lastly, it reduces pollution of useless {@link io.netty.channel.ChannelPromise} objects created
+     * by the default write and flush method {@link ChannelOutboundInvoker#writeAndFlush(Object)}.
+     * See https://stackoverflow.com/q/54169262 and https://stackoverflow.com/a/9030420 for more details.
+     *
+     * @param ctx channel's context
+     * @param msg buffer to write in the channel
+     */
+    public static void writeAndFlushWithVoidPromise(ChannelOutboundInvoker ctx, Object msg) {
+        ctx.writeAndFlush(msg, ctx.voidPromise());
+    }
+
+    /**
+     * Write and flush the message to the channel and the close the channel.
+     *
+     * This method is particularly helpful when the connection is in an invalid state
+     * and therefore a new connection must be created to continue.
+     *
+     * @param ctx channel's context
+     * @param msg buffer to write in the channel
+     */
+    public static void writeAndFlushWithClosePromise(ChannelOutboundInvoker ctx, Object msg) {
+        ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java
new file mode 100644
index 0000000000..ea47d83ec7
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.VoidChannelPromise;
+import java.nio.charset.StandardCharsets;
+import org.junit.Test;
+
+public class NettyChannelUtilTest {
+
+    @Test
+    public void testWriteAndFlushWithVoidPromise() {
+        final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+        final VoidChannelPromise voidChannelPromise = new VoidChannelPromise(mock(Channel.class), true);
+        when(ctx.voidPromise()).thenReturn(voidChannelPromise);
+        final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+        final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+        try {
+            NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, byteBuf);
+            verify(ctx).writeAndFlush(same(byteBuf), same(voidChannelPromise));
+            verify(ctx).voidPromise();
+        } finally {
+            byteBuf.release();
+        }
+    }
+
+    @Test
+    public void testWriteAndFlushWithClosePromise() {
+        final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+        final ChannelPromise promise = mock(ChannelPromise.class);
+
+        final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+        final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+        when(ctx.writeAndFlush(same(byteBuf))).thenReturn(promise);
+        try {
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, byteBuf);
+            verify(ctx).writeAndFlush(same(byteBuf));
+            verify(promise).addListener(same(ChannelFutureListener.CLOSE));
+        } finally {
+            byteBuf.release();
+        }
+    }
+
+}
\ No newline at end of file