You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/11/04 00:28:40 UTC
[dubbo] branch 3.2 updated: support encode in user thread, optimize decode in user thread (#10854)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 5016f550be support encode in user thread, optimize decode in user thread (#10854)
5016f550be is described below
commit 5016f550be52f14a232399409a3c97fa6d6db321
Author: 一个不知名的Java靓仔 <cl...@gmail.com>
AuthorDate: Fri Nov 4 08:28:23 2022 +0800
support encode in user thread, optimize decode in user thread (#10854)
---
.../dubbo/common/constants/CommonConstants.java | 4 ++
.../dubbo/common/utils/CacheableSupplier.java | 43 ++++++++++++
.../remoting/transport/netty4/NettyChannel.java | 79 +++++++++++++++++++++-
.../transport/netty4/NettyClientHandler.java | 46 -------------
.../transport/netty4/NettyCodecAdapter.java | 26 +++++--
.../transport/netty4/NettyChannelTest.java | 15 +++-
.../transport/netty4/NettyClientHandlerTest.java | 10 +--
.../protocol/dubbo/DecodeableRpcInvocation.java | 8 ++-
8 files changed, 170 insertions(+), 61 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index b8709881cd..01c1de33a4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -600,4 +600,8 @@ public interface CommonConstants {
*
*/
String DUBBO_JSTACK_MAXLINE = "dubbo.jstack-dump.max-line";
+
+
+ String ENCODE_IN_IO_THREAD_KEY = "encode.in.io";
+ boolean DEFAULT_ENCODE_IN_IO_THREAD = false;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java
new file mode 100644
index 0000000000..cc81d34d05
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.utils;
+
+import java.util.function.Supplier;
+
+public class CacheableSupplier<T> implements Supplier<T> {
+
+ private volatile T object;
+
+ private final Supplier<T> supplier;
+
+ public CacheableSupplier(Supplier<T> supplier) {
+ this.supplier = supplier;
+ }
+
+ public static <T> CacheableSupplier<T> newSupplier(Supplier<T> supplier) {
+ return new CacheableSupplier<>(supplier);
+ }
+
+ @Override
+ public T get() {
+ if (this.object == null) {
+ this.object = supplier.get();
+ }
+ return object;
+ }
+}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
index 24e3a8057f..9006e713c7 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
@@ -16,16 +16,27 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.AbstractChannel;
+import org.apache.dubbo.remoting.transport.codec.CodecAdapter;
import org.apache.dubbo.remoting.utils.PayloadDropper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -33,8 +44,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ENCODE_IN_IO_THREAD;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENCODE_IN_IO_THREAD_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.rpc.model.ScopeModelUtil.getFrameworkModel;
/**
* NettyChannel maintains the cache of channel.
@@ -57,6 +71,10 @@ final class NettyChannel extends AbstractChannel {
private final Netty4BatchWriteQueue writeQueue;
+ private final Codec2 codec;
+
+ private final boolean encodeInIOThread;
+
/**
* The constructor of NettyChannel.
* It is private so NettyChannel usually create by {@link NettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)}
@@ -72,6 +90,8 @@ final class NettyChannel extends AbstractChannel {
}
this.channel = channel;
this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel);
+ this.codec = getChannelCodec(url);
+ this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY, DEFAULT_ENCODE_IN_IO_THREAD);
}
/**
@@ -164,7 +184,33 @@ final class NettyChannel extends AbstractChannel {
boolean success = true;
int timeout = 0;
try {
- ChannelFuture future = writeQueue.enqueue(message);
+ Object outputMessage = message;
+ if (!encodeInIOThread) {
+ ByteBuf buf = channel.alloc().buffer();
+ ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);
+ codec.encode(this, buffer, message);
+ outputMessage = buf;
+ }
+ ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!(message instanceof Request)) {
+ return;
+ }
+ ChannelHandler handler = getChannelHandler();
+ if (future.isSuccess()) {
+ handler.sent(NettyChannel.this, message);
+ } else {
+ Throwable t = future.cause();
+ if (t == null) {
+ return;
+ }
+ Response response = buildErrorResponse((Request) message, t);
+ handler.received(NettyChannel.this, response);
+ }
+ }
+ });
+
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
@@ -287,4 +333,35 @@ final class NettyChannel extends AbstractChannel {
public Channel getNioChannel() {
return channel;
}
+
+ /**
+ * build a bad request's response
+ *
+ * @param request the request
+ * @param t the throwable. In most cases, serialization fails.
+ * @return the response
+ */
+ private static Response buildErrorResponse(Request request, Throwable t) {
+ Response response = new Response(request.getId(), request.getVersion());
+ response.setStatus(Response.BAD_REQUEST);
+ response.setErrorMessage(StringUtils.toString(t));
+ return response;
+ }
+
+ private static Codec2 getChannelCodec(URL url) {
+ String codecName = url.getParameter(Constants.CODEC_KEY);
+ if (StringUtils.isEmpty(codecName)) {
+ // codec extension name must stay the same with protocol name
+ codecName = url.getProtocol();
+ }
+ FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
+ if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
+ return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
+ } else if(frameworkModel.getExtensionLoader(Codec.class).hasExtension(codecName)){
+ return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
+ .getExtension(codecName));
+ }else {
+ return frameworkModel.getExtensionLoader(Codec2.class).getExtension("default");
+ }
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
index 63690a8759..0ab1c129fd 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
@@ -20,16 +20,13 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
@@ -87,36 +84,6 @@ public class NettyClientHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
- final NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
- final boolean isRequest = msg instanceof Request;
- final boolean isMultiMessage = msg instanceof MultiMessage;
-
- // We add listeners to make sure our out bound event is correct.
- // If our out bound event has an error (in most cases the encoder fails),
- // we need to have the request return directly instead of blocking the invoke process.
- promise.addListener(future -> {
- if (future.isSuccess()) {
- // if our future is success, mark the future to sent.
- handler.sent(channel, msg);
- return;
- }
-
- Throwable t = future.cause();
- if (t != null && isRequest) {
- Request request = (Request) msg;
- Response response = buildErrorResponse(request, t);
- handler.received(channel, response);
- } else if (t != null && isMultiMessage) {
- MultiMessage multiMessage = (MultiMessage) msg;
- for (Object originMessage : multiMessage) {
- if (originMessage instanceof Request) {
- Request request = (Request) originMessage;
- Response response = buildErrorResponse(request, t);
- handler.received(channel, response);
- }
- }
- }
- });
}
@Override
@@ -152,17 +119,4 @@ public class NettyClientHandler extends ChannelDuplexHandler {
}
}
- /**
- * build a bad request's response
- *
- * @param request the request
- * @param t the throwable. In most cases, serialization fails.
- * @return the response
- */
- private static Response buildErrorResponse(Request request, Throwable t) {
- Response response = new Response(request.getId(), request.getVersion());
- response.setStatus(Response.BAD_REQUEST);
- response.setErrorMessage(StringUtils.toString(t));
- return response;
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index beed485b3a..bf029272ab 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import java.io.IOException;
import java.util.List;
@@ -63,10 +64,27 @@ final public class NettyCodecAdapter {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
- ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
- Channel ch = ctx.channel();
- NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
- codec.encode(channel, buffer, msg);
+ boolean encoded = false;
+ if (msg instanceof ByteBuf) {
+ out.writeBytes(((ByteBuf) msg));
+ encoded = true;
+ } else if (msg instanceof MultiMessage) {
+ for (Object singleMessage : ((MultiMessage) msg)) {
+ if (singleMessage instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) singleMessage;
+ out.writeBytes(buf);
+ encoded = true;
+ buf.release();
+ }
+ }
+ }
+
+ if (!encoded) {
+ ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
+ Channel ch = ctx.channel();
+ NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
+ codec.encode(channel, buffer, msg);
+ }
}
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
index 93add5737c..9a79ee2f2f 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
@@ -16,17 +16,19 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.net.InetSocketAddress;
@@ -78,12 +80,14 @@ public class NettyChannelTest {
@Test
public void testSend() throws Exception {
Mockito.when(channel.eventLoop()).thenReturn(Mockito.mock(EventLoop.class));
+ Mockito.when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
NettyChannel nettyChannel = NettyChannel.getOrAddChannel(channel, url, channelHandler);
ChannelPromise future = Mockito.mock(ChannelPromise.class);
Mockito.when(future.await(1000)).thenReturn(true);
Mockito.when(future.cause()).thenReturn(null);
Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(future);
Mockito.when(channel.newPromise()).thenReturn(future);
+ Mockito.when(future.addListener(Mockito.any())).thenReturn(future);
nettyChannel.send("msg", true);
NettyChannel finalNettyChannel = nettyChannel;
@@ -99,6 +103,15 @@ public class NettyChannelTest {
Assertions.assertThrows(RemotingException.class, () -> {
finalNettyChannel.send("msg", true);
}, "in timeout(1000ms) limit");
+
+ ChannelPromise channelPromise = Mockito.mock(ChannelPromise.class);
+ Mockito.when(channel.newPromise()).thenReturn(channelPromise);
+ Mockito.when(channelPromise.await(1000)).thenReturn(true);
+ Mockito.when(channelPromise.cause()).thenReturn(null);
+ Mockito.when(channelPromise.addListener(Mockito.any())).thenReturn(channelPromise);
+ finalNettyChannel.send("msg", true);
+ ArgumentCaptor<GenericFutureListener> listenerArgumentCaptor = ArgumentCaptor.forClass(GenericFutureListener.class);
+ Mockito.verify(channelPromise, Mockito.times(1)).addListener(listenerArgumentCaptor.capture());
}
@Test
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
index ddf2853082..7d5690c8a6 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.concurrent.GenericFutureListener;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -45,11 +45,13 @@ public class NettyClientHandlerTest {
Mockito.when(ctx.channel()).thenReturn(channel);
Mockito.when(channel.isActive()).thenReturn(true);
Mockito.when(channel.eventLoop()).thenReturn(new NioEventLoopGroup().next());
+ Mockito.when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
ChannelPromise future = mock(ChannelPromise.class);
when(channel.writeAndFlush(any())).thenReturn(future);
when(future.cause()).thenReturn(null);
when(channel.newPromise()).thenReturn(future);
+ when(future.addListener(Mockito.any())).thenReturn(future);
NettyClientHandler nettyClientHandler = new NettyClientHandler(url, handler);
@@ -78,11 +80,5 @@ public class NettyClientHandlerTest {
Mockito.verify(channel, Mockito.times(1)).writeAndFlush(requestArgumentCaptor.capture());
- Request request = new Request();
- ChannelPromise promise = Mockito.mock(ChannelPromise.class);
- nettyClientHandler.write(ctx,request,promise);
- ArgumentCaptor<GenericFutureListener> listenerArgumentCaptor = ArgumentCaptor.forClass(GenericFutureListener.class);
- Mockito.verify(promise, Mockito.times(1)).addListener(listenerArgumentCaptor.capture());
-
}
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 700687e881..14b7ebb903 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.dubbo;
+import org.apache.dubbo.common.utils.CacheableSupplier;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.Cleanable;
@@ -45,6 +46,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup;
import static org.apache.dubbo.common.URL.buildKey;
@@ -71,7 +73,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
protected final FrameworkModel frameworkModel;
- private CallbackServiceCodec callbackServiceCodec;
+ private Supplier<CallbackServiceCodec> callbackServiceCodecFactory;
public DecodeableRpcInvocation(FrameworkModel frameworkModel, Channel channel, Request request, InputStream is, byte id) {
this.frameworkModel = frameworkModel;
@@ -82,7 +84,8 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
this.request = request;
this.inputStream = is;
this.serializationType = id;
- callbackServiceCodec = new CallbackServiceCodec(frameworkModel);
+ this.callbackServiceCodecFactory = CacheableSupplier.newSupplier(()->
+ new CallbackServiceCodec(frameworkModel));
}
@Override
@@ -220,6 +223,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
}
//decode argument ,may be callback
+ CallbackServiceCodec callbackServiceCodec = callbackServiceCodecFactory.get();
for (int i = 0; i < args.length; i++) {
args[i] = callbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
}