You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:25 UTC
[bookkeeper] 01/31: Use ChannelVoidPromise to avoid useless promise objects creation (#3733)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit e03949af7bc2200a3e8243ca3c0f521c3270e198
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu May 4 09:00:22 2023 +0200
Use ChannelVoidPromise to avoid useless promise objects creation (#3733)
(cherry picked from commit bbea67b09566002b9393a8b8f256988ea9755cc1)
---
.../org/apache/bookkeeper/proto/AuthHandler.java | 42 +++++++------
.../bookkeeper/proto/BookieRequestProcessor.java | 40 +++++++-----
.../bookkeeper/proto/PacketProcessorBase.java | 4 +-
.../apache/bookkeeper/util/NettyChannelUtil.java | 62 ++++++++++++++++++
.../bookkeeper/util/NettyChannelUtilTest.java | 73 ++++++++++++++++++++++
5 files changed, 188 insertions(+), 33 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index f923b61ad5..720e7279f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.NettyChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,15 +103,15 @@ class AuthHandler {
} else if (msg instanceof BookieProtocol.Request) {
BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (req.getOpCode() == BookieProtocol.ADDENTRY) {
- ctx.channel().writeAndFlush(
- BookieProtocol.AddResponse.create(
- req.getProtocolVersion(), BookieProtocol.EUA,
- req.getLedgerId(), req.getEntryId()));
+ final BookieProtocol.AddResponse response = BookieProtocol.AddResponse.create(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
} else if (req.getOpCode() == BookieProtocol.READENTRY) {
- ctx.channel().writeAndFlush(
- new BookieProtocol.ReadResponse(
- req.getProtocolVersion(), BookieProtocol.EUA,
- req.getLedgerId(), req.getEntryId()));
+ final BookieProtocol.ReadResponse response = new BookieProtocol.ReadResponse(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), response);
} else {
ctx.channel().close();
}
@@ -133,7 +134,7 @@ class AuthHandler {
.setHeader(req.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EUA);
- ctx.channel().writeAndFlush(builder.build());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx.channel(), builder.build());
}
} else {
// close the channel, junk coming over it
@@ -172,7 +173,9 @@ class AuthHandler {
}
AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(req.authMessage.getAuthPluginName())
.setPayload(ByteString.copyFrom(newam.getData())).build();
- channel.writeAndFlush(new BookieProtocol.AuthResponse(req.getProtocolVersion(), message));
+ final BookieProtocol.AuthResponse response =
+ new BookieProtocol.AuthResponse(req.getProtocolVersion(), message);
+ NettyChannelUtil.writeAndFlushWithVoidPromise(channel, response);
}
}
@@ -196,14 +199,17 @@ class AuthHandler {
LOG.error("Error processing auth message, closing connection");
builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
- channel.writeAndFlush(builder.build());
- channel.close();
+ NettyChannelUtil.writeAndFlushWithClosePromise(
+ channel, builder.build()
+ );
return;
} else {
AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();
builder.setStatus(BookkeeperProtocol.StatusCode.EOK).setAuthResponse(message);
- channel.writeAndFlush(builder.build());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(
+ channel, builder.build()
+ );
}
}
}
@@ -399,9 +405,9 @@ class AuthHandler {
.setPayload(ByteString.copyFrom(newam.getData())).build();
if (isUsingV2Protocol) {
- channel.writeAndFlush(
- new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
- channel.voidPromise());
+ final BookieProtocol.AuthRequest msg =
+ new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
+ NettyChannelUtil.writeAndFlushWithVoidPromise(channel, msg);
} else {
// V3 protocol
BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
@@ -410,7 +416,7 @@ class AuthHandler {
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
.setHeader(header)
.setAuthRequest(message);
- channel.writeAndFlush(builder.build());
+ NettyChannelUtil.writeAndFlushWithVoidPromise(channel, builder.build());
}
}
}
@@ -429,7 +435,7 @@ class AuthHandler {
authenticated = true;
Object msg = waitingForAuth.poll();
while (msg != null) {
- ctx.writeAndFlush(msg);
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg);
msg = waitingForAuth.poll();
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 6e7f5abcbb..7ebd8c90a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.NettyChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -330,11 +331,12 @@ public class BookieRequestProcessor implements RequestProcessor {
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
- BookkeeperProtocol.Response.Builder authResponse = BookkeeperProtocol.Response
+ final BookkeeperProtocol.Response authResponse = BookkeeperProtocol.Response
.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EOK)
- .setAuthResponse(message);
- channel.writeAndFlush(authResponse.build());
+ .setAuthResponse(message)
+ .build();
+ writeAndFlush(channel, authResponse);
break;
case WRITE_LAC:
processWriteLacRequestV3(r, requestHandler);
@@ -353,10 +355,11 @@ public class BookieRequestProcessor implements RequestProcessor {
break;
default:
LOG.info("Unknown operation type {}", header.getOperation());
- BookkeeperProtocol.Response.Builder response =
+ final BookkeeperProtocol.Response response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
- .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
- channel.writeAndFlush(response.build());
+ .setStatus(BookkeeperProtocol.StatusCode.EBADREQ)
+ .build();
+ writeAndFlush(channel, response);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
@@ -386,12 +389,15 @@ public class BookieRequestProcessor implements RequestProcessor {
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();
- channel.writeAndFlush(new BookieProtocol.AuthResponse(
- BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
+ final BookieProtocol.AuthResponse response = new BookieProtocol.AuthResponse(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, message);
+ writeAndFlush(channel, response);
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
- channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+ final BookieProtocol.Response errResponse = ResponseBuilder
+ .buildErrorResponse(BookieProtocol.EBADREQ, r);
+ writeAndFlush(channel, errResponse);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
@@ -568,7 +574,7 @@ public class BookieRequestProcessor implements RequestProcessor {
if (shFactory == null) {
LOG.error("Got StartTLS request but TLS not configured");
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
- c.writeAndFlush(response.build());
+ writeAndFlush(c, response.build());
} else {
// there is no need to execute in a different thread as this operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
@@ -597,16 +603,18 @@ public class BookieRequestProcessor implements RequestProcessor {
} else {
LOG.error("TLS Handshake failure: ", future.cause());
}
- BookkeeperProtocol.Response.Builder errResponse = BookkeeperProtocol.Response.newBuilder()
- .setHeader(r.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EIO);
- c.writeAndFlush(errResponse.build());
+ final BookkeeperProtocol.Response errResponse = BookkeeperProtocol.Response.newBuilder()
+ .setHeader(r.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EIO)
+ .build();
+ writeAndFlush(c, errResponse);
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
}
}
});
- c.writeAndFlush(response.build());
+ writeAndFlush(c, response.build());
}
}
@@ -722,4 +730,8 @@ public class BookieRequestProcessor implements RequestProcessor {
public void handleNonWritableChannel(Channel channel) {
onResponseTimeout.accept(channel);
}
+
+ private static void writeAndFlush(Channel channel, Object msg) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(channel, msg);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index c9798156c2..889d3790d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -122,13 +122,15 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable {
}
if (channel.isActive()) {
- ChannelPromise promise = channel.voidPromise();
+ final ChannelPromise promise;
if (logger.isDebugEnabled()) {
promise = channel.newPromise().addListener(future -> {
if (!future.isSuccess()) {
logger.debug("Netty channel write exception. ", future.cause());
}
});
+ } else {
+ promise = channel.voidPromise();
}
channel.writeAndFlush(response, promise);
} else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java
new file mode 100644
index 0000000000..7add100e74
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NettyChannelUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.VoidChannelPromise;
+
+/**
+ * Contains utility methods for working with Netty Channels.
+ */
+public final class NettyChannelUtil {
+
+ private NettyChannelUtil() {
+ }
+
+ /**
+ * Write and flush the message to the channel.
+ *
+ * The promise is an instance of {@link VoidChannelPromise} that properly propagates exceptions up to the pipeline.
+ * Netty has many ad-hoc optimization if the promise is an instance of {@link VoidChannelPromise}.
+ * Lastly, it reduces pollution of useless {@link io.netty.channel.ChannelPromise} objects created
+ * by the default write and flush method {@link ChannelOutboundInvoker#writeAndFlush(Object)}.
+ * See https://stackoverflow.com/q/54169262 and https://stackoverflow.com/a/9030420 for more details.
+ *
+ * @param ctx channel's context
+ * @param msg buffer to write in the channel
+ */
+ public static void writeAndFlushWithVoidPromise(ChannelOutboundInvoker ctx, Object msg) {
+ ctx.writeAndFlush(msg, ctx.voidPromise());
+ }
+
+ /**
+ * Write and flush the message to the channel and the close the channel.
+ *
+ * This method is particularly helpful when the connection is in an invalid state
+ * and therefore a new connection must be created to continue.
+ *
+ * @param ctx channel's context
+ * @param msg buffer to write in the channel
+ */
+ public static void writeAndFlushWithClosePromise(ChannelOutboundInvoker ctx, Object msg) {
+ ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java
new file mode 100644
index 0000000000..ea47d83ec7
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/NettyChannelUtilTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.VoidChannelPromise;
+import java.nio.charset.StandardCharsets;
+import org.junit.Test;
+
+public class NettyChannelUtilTest {
+
+ @Test
+ public void testWriteAndFlushWithVoidPromise() {
+ final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+ final VoidChannelPromise voidChannelPromise = new VoidChannelPromise(mock(Channel.class), true);
+ when(ctx.voidPromise()).thenReturn(voidChannelPromise);
+ final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+ final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+ try {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, byteBuf);
+ verify(ctx).writeAndFlush(same(byteBuf), same(voidChannelPromise));
+ verify(ctx).voidPromise();
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+ @Test
+ public void testWriteAndFlushWithClosePromise() {
+ final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+ final ChannelPromise promise = mock(ChannelPromise.class);
+
+ final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+ final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+ when(ctx.writeAndFlush(same(byteBuf))).thenReturn(promise);
+ try {
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, byteBuf);
+ verify(ctx).writeAndFlush(same(byteBuf));
+ verify(promise).addListener(same(ChannelFutureListener.CLOSE));
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+}
\ No newline at end of file