You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/10/29 02:54:20 UTC
[dubbo] branch 3.0 updated: Add unit test for dubbo-rpc-dubbo
module (#9135)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 9bb2ab8 Add unit test for dubbo-rpc-dubbo module (#9135)
9bb2ab8 is described below
commit 9bb2ab88e2bdaeef95df04ed4425e1b99760e562
Author: 灼华 <43...@users.noreply.github.com>
AuthorDate: Fri Oct 29 10:54:11 2021 +0800
Add unit test for dubbo-rpc-dubbo module (#9135)
---
.../rpc/protocol/dubbo/CallbackServiceCodec.java | 2 +-
.../protocol/dubbo/DecodeableRpcInvocation.java | 4 +-
.../rpc/protocol/dubbo/DecodeableRpcResult.java | 2 +-
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 2 +-
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 10 +-
.../rpc/protocol/dubbo/filter/TraceFilter.java | 9 +-
.../dubbo/status/ThreadPoolStatusChecker.java | 4 +-
.../dubbo/DecodeableRpcInvocationTest.java | 114 +++++++++++++++++
.../protocol/dubbo/DecodeableRpcResultTest.java | 139 +++++++++++++++++++++
.../rpc/protocol/dubbo/DubboCountCodecTest.java | 83 ++++++++++++
.../dubbo/status/ServerStatusCheckerTest.java | 67 ++++++++++
.../dubbo/status/ThreadPoolStatusCheckerTest.java | 55 ++++++++
12 files changed, 475 insertions(+), 16 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index 922b181..c001173 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -61,7 +61,7 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.IS_CALLBACK_SERVICE;
/**
* callback service helper
*/
-class CallbackServiceCodec {
+public class CallbackServiceCodec {
private static final Logger logger = LoggerFactory.getLogger(CallbackServiceCodec.class);
private static final byte CALLBACK_NONE = 0x0;
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 3f7c90a..045abba 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -222,7 +222,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
setParameterTypes(pts);
Map<String, Object> map = in.readAttachments();
- if (map != null && map.size() > 0) {
+ if (CollectionUtils.isNotEmptyMap(map)) {
Map<String, Object> attachment = getObjectAttachments();
if (attachment == null) {
attachment = new HashMap<>();
@@ -237,7 +237,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
}
setArguments(args);
- String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
+ String targetServiceName = buildKey(getAttachment(PATH_KEY),
getAttachment(GROUP_KEY),
getAttachment(VERSION_KEY));
setTargetServiceUniqueName(targetServiceName);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index 6d1a9d0..3c33fca 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -156,7 +156,7 @@ public class DecodeableRpcResult extends AppResponse implements Codec, Decodeabl
} else {
returnTypes = RpcUtils.getReturnTypes(invocation);
}
- Object value = null;
+ Object value;
if (ArrayUtils.isEmpty(returnTypes)) {
// This almost never happens?
value = in.readObject();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 67e3f11..eb09f5e 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -186,7 +186,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
private int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
- int timeout = DEFAULT_TIMEOUT;
+ int timeout;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getClientAttachment(), DEFAULT_TIMEOUT);
if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 3e12951..50110d7 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -296,7 +296,7 @@ public class DubboProtocol extends AbstractProtocol {
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
- //export an stub service for dispatching event
+ //export a stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
@@ -320,7 +320,7 @@ public class DubboProtocol extends AbstractProtocol {
checkDestroyed();
// find server.
String key = url.getAddress();
- //client can export a service which's only for server to invoke
+ // client can export a service which only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
@@ -356,7 +356,7 @@ public class DubboProtocol extends AbstractProtocol {
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
- if (str != null && str.length() > 0 && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
+ if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
@@ -368,7 +368,7 @@ public class DubboProtocol extends AbstractProtocol {
}
str = url.getParameter(CLIENT_KEY);
- if (str != null && str.length() > 0) {
+ if (StringUtils.isNotEmpty(str)) {
Set<String> supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
@@ -631,7 +631,7 @@ public class DubboProtocol extends AbstractProtocol {
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
- if (str != null && str.length() > 0 && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
+ if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
index 16e5fec..2ae46bc 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/TraceFilter.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Filter;
@@ -56,7 +57,7 @@ public class TraceFilter implements Filter {
public static void addTracer(Class<?> type, String method, Channel channel, int max) {
channel.setAttribute(TRACE_MAX, max);
channel.setAttribute(TRACE_COUNT, new AtomicInteger());
- String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
+ String key = StringUtils.isNotEmpty(method) ? type.getName() + "." + method : type.getName();
Set<Channel> channels = TRACERS.computeIfAbsent(key, k -> new ConcurrentHashSet<>());
channels.add(channel);
}
@@ -64,7 +65,7 @@ public class TraceFilter implements Filter {
public static void removeTracer(Class<?> type, String method, Channel channel) {
channel.removeAttribute(TRACE_MAX);
channel.removeAttribute(TRACE_COUNT);
- String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
+ String key = StringUtils.isNotEmpty(method) ? type.getName() + "." + method : type.getName();
Set<Channel> channels = TRACERS.get(key);
if (channels != null) {
channels.remove(channel);
@@ -79,7 +80,7 @@ public class TraceFilter implements Filter {
if (TRACERS.size() > 0) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
Set<Channel> channels = TRACERS.get(key);
- if (channels == null || channels.isEmpty()) {
+ if (CollectionUtils.isEmpty(channels)) {
key = invoker.getInterface().getName();
channels = TRACERS.get(key);
}
@@ -92,7 +93,7 @@ public class TraceFilter implements Filter {
if (m != null) {
max = m;
}
- int count = 0;
+ int count;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
index 4c56f2b..d6c1a12 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusChecker.java
@@ -57,8 +57,8 @@ public class ThreadPoolStatusChecker implements StatusChecker {
msg.append(';');
}
msg.append("Pool status:").append(lvl).append(", max:").append(tp.getMaximumPoolSize()).append(", core:")
- .append(tp.getCorePoolSize()).append(", largest:").append(tp.getLargestPoolSize()).append(", active:")
- .append(tp.getActiveCount()).append(", task:").append(tp.getTaskCount()).append(", service port: ").append(port);
+ .append(tp.getCorePoolSize()).append(", largest:").append(tp.getLargestPoolSize()).append(", active:")
+ .append(tp.getActiveCount()).append(", task:").append(tp.getTaskCount()).append(", service port: ").append(port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
new file mode 100644
index 0000000..523302b
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBufferInputStream;
+import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.dubbo.decode.MockChannel;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.rpc.protocol.dubbo.DubboCodec.DUBBO_VERSION;
+
+/**
+ * {@link DecodeableRpcInvocation}
+ */
+public class DecodeableRpcInvocationTest {
+
+ @Test
+ public void test() throws Exception {
+ // Simulate the data called by the client(The called data is stored in invocation and written to the buffer)
+ URL url = new ServiceConfigURL("dubbo", "127.0.0.1", 9103, DemoService.class.getName(), VERSION_KEY, "1.0.0");
+ RpcInvocation inv = new RpcInvocation(null, "sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new String[]{"yug"});
+ inv.setObjectAttachment(PATH_KEY, url.getPath());
+ inv.setObjectAttachment(VERSION_KEY, url.getVersion());
+ inv.setObjectAttachment(DUBBO_VERSION_KEY, DUBBO_VERSION);
+ inv.setObjectAttachment("k1", "v1");
+ inv.setObjectAttachment("k2", "v2");
+ inv.setTargetServiceUniqueName(url.getServiceKey());
+ // Write the data of inv to the buffer
+ Byte proto = CodecSupport.getIDByName("hessian2");
+ ChannelBuffer buffer = writeBuffer(url, inv, proto);
+
+ FrameworkModel frameworkModel = new FrameworkModel();
+ ApplicationModel applicationModel = new ApplicationModel(frameworkModel);
+ applicationModel.getDefaultModule().getServiceRepository().registerService(DemoService.class.getName(), DemoService.class);
+ frameworkModel.getServiceRepository().registerProviderUrl(url);
+
+ // Simulate the server to decode
+ Channel channel = new MockChannel();
+ Request request = new Request(1);
+ ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, buffer.readableBytes());
+ DecodeableRpcInvocation decodeableRpcInvocation = new DecodeableRpcInvocation(frameworkModel, channel, request, is, proto);
+ decodeableRpcInvocation.decode();
+
+ // Verify that the decodeableRpcInvocation data decoded by the server is consistent with the invocation data of the client
+ Assertions.assertEquals(request.getVersion(), DUBBO_VERSION);
+ Assertions.assertEquals(decodeableRpcInvocation.getObjectAttachment(DUBBO_VERSION_KEY), DUBBO_VERSION);
+ Assertions.assertEquals(decodeableRpcInvocation.getObjectAttachment(VERSION_KEY), inv.getObjectAttachment(VERSION_KEY));
+ Assertions.assertEquals(decodeableRpcInvocation.getObjectAttachment(PATH_KEY), inv.getObjectAttachment(PATH_KEY));
+ Assertions.assertEquals(decodeableRpcInvocation.getMethodName(), inv.getMethodName());
+ Assertions.assertEquals(decodeableRpcInvocation.getParameterTypesDesc(), inv.getParameterTypesDesc());
+ Assertions.assertArrayEquals(decodeableRpcInvocation.getParameterTypes(), inv.getParameterTypes());
+ Assertions.assertArrayEquals(decodeableRpcInvocation.getArguments(), inv.getArguments());
+ Assertions.assertTrue(CollectionUtils.mapEquals(decodeableRpcInvocation.getObjectAttachments(), inv.getObjectAttachments()));
+ Assertions.assertEquals(decodeableRpcInvocation.getTargetServiceUniqueName(), inv.getTargetServiceUniqueName());
+
+ frameworkModel.destroy();
+ }
+
+ private ChannelBuffer writeBuffer(URL url, RpcInvocation inv, Byte proto) throws IOException {
+ Serialization serialization = CodecSupport.getSerializationById(proto);
+ ChannelBuffer buffer = ChannelBuffers.buffer(1024);
+ ChannelBufferOutputStream outputStream = new ChannelBufferOutputStream(buffer);
+ ObjectOutput out = serialization.serialize(url, outputStream);
+ out.writeUTF(inv.getAttachment(DUBBO_VERSION_KEY)); // dubbo version
+ out.writeUTF(inv.getAttachment(PATH_KEY)); // path
+ out.writeUTF(inv.getAttachment(VERSION_KEY)); // version
+ out.writeUTF(inv.getMethodName()); // methodName
+ out.writeUTF(inv.getParameterTypesDesc()); // parameterTypesDesc
+ Object[] args = inv.getArguments();
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ out.writeObject(args[i]); // args
+ }
+ }
+ out.writeAttachments(inv.getObjectAttachments()); // attachments
+ out.flushBuffer();
+ outputStream.close();
+ return buffer;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResultTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResultTest.java
new file mode 100644
index 0000000..3887c03
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResultTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBufferInputStream;
+import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ModuleServiceRepository;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.protocol.dubbo.decode.MockChannel;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+import static org.apache.dubbo.rpc.RpcException.BIZ_EXCEPTION;
+import static org.apache.dubbo.rpc.protocol.dubbo.DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS;
+import static org.apache.dubbo.rpc.protocol.dubbo.DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS;
+
+/**
+ * {@link DecodeableRpcResult}
+ */
+public class DecodeableRpcResultTest {
+ private ModuleServiceRepository repository;
+
+ @BeforeEach
+ public void setUp() {
+ repository = ApplicationModel.defaultModel().getDefaultModule().getServiceRepository();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ FrameworkModel.destroyAll();
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Mock a rpcInvocation, this rpcInvocation is usually generated by the client request, and stored in Request#data
+ Byte proto = CodecSupport.getIDByName("hessian2");
+ URL url = new ServiceConfigURL("dubbo", "127.0.0.1", 9103, DemoService.class.getName(), VERSION_KEY, "1.0.0");
+ ServiceDescriptor serviceDescriptor = repository.registerService(DemoService.class);
+ ProviderModel providerModel = new ProviderModel(url.getServiceKey(), new DemoServiceImpl(), serviceDescriptor, null, null);
+ RpcInvocation rpcInvocation = new RpcInvocation(providerModel, "echo", DemoService.class.getName(), "", new Class<?>[]{String.class}, new String[]{"yug"});
+ rpcInvocation.put(SERIALIZATION_ID_KEY, proto);
+
+ // Mock a response result returned from the server and write to the buffer
+ Channel channel = new MockChannel();
+ Response response = new Response(1);
+ Result result = new AppResponse();
+ result.setValue("yug");
+ response.setResult(result);
+ ChannelBuffer buffer = writeBuffer(url, response, proto, false);
+
+ // The client reads and decode the buffer
+ InputStream is = new ChannelBufferInputStream(buffer, buffer.readableBytes());
+ DecodeableRpcResult decodeableRpcResult = new DecodeableRpcResult(channel, response, is, rpcInvocation, proto);
+ decodeableRpcResult.decode();
+
+ // Verify RESPONSE_VALUE_WITH_ATTACHMENTS
+ // Verify that the decodeableRpcResult decoded by the client is consistent with the response returned by the server
+ Assertions.assertEquals(decodeableRpcResult.getValue(), result.getValue());
+ Assertions.assertTrue(CollectionUtils.mapEquals(decodeableRpcResult.getObjectAttachments(), result.getObjectAttachments()));
+
+ // Verify RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ Response exResponse = new Response(2);
+ Result exResult = new AppResponse();
+ exResult.setException(new RpcException(BIZ_EXCEPTION));
+ exResponse.setResult(exResult);
+ buffer = writeBuffer(url, exResponse, proto, true);
+ is = new ChannelBufferInputStream(buffer, buffer.readableBytes());
+ decodeableRpcResult = new DecodeableRpcResult(channel, response, is, rpcInvocation, proto);
+ decodeableRpcResult.decode();
+ Assertions.assertEquals(((RpcException) decodeableRpcResult.getException()).getCode(), ((RpcException) exResult.getException()).getCode());
+ Assertions.assertTrue(CollectionUtils.mapEquals(decodeableRpcResult.getObjectAttachments(), exResult.getObjectAttachments()));
+
+ }
+
+ private ChannelBuffer writeBuffer(URL url, Response response, Byte proto, boolean writeEx) throws IOException {
+ Serialization serialization = CodecSupport.getSerializationById(proto);
+ ChannelBuffer buffer = ChannelBuffers.buffer(1024 * 10);
+ ChannelBufferOutputStream outputStream = new ChannelBufferOutputStream(buffer);
+ ObjectOutput out = serialization.serialize(url, outputStream);
+ Result result = (Result) response.getResult();
+ if (!writeEx) {
+ out.writeByte(RESPONSE_VALUE_WITH_ATTACHMENTS);
+ out.writeObject(result.getValue());
+ } else {
+ out.writeByte(RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS);
+ out.writeThrowable(result.getException());
+ }
+ result.getObjectAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
+ result.getObjectAttachments().put("k1", "v1");
+ out.writeAttachments(result.getObjectAttachments());
+ out.flushBuffer();
+ outputStream.close();
+ return buffer;
+ }
+
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
new file mode 100644
index 0000000..1d7bec2
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.dubbo.decode.MockChannel;
+import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.apache.dubbo.rpc.Constants.INPUT_KEY;
+import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
+
+public class DubboCountCodecTest {
+
+ @Test
+ public void test() throws Exception {
+ DubboCountCodec dubboCountCodec = new DubboCountCodec(FrameworkModel.defaultModel());
+ ChannelBuffer buffer = ChannelBuffers.buffer(1024);
+ Channel channel = new MockChannel();
+ Assertions.assertEquals(Codec2.DecodeResult.NEED_MORE_INPUT, dubboCountCodec.decode(channel, buffer));
+
+ for (int i = 0; i < 10; i++) {
+ Request request = new Request(1);
+ RpcInvocation rpcInvocation = new RpcInvocation(null, "echo", DemoService.class.getName(), "", new Class<?>[]{String.class}, new String[]{"yug"});
+ request.setData(rpcInvocation);
+ dubboCountCodec.encode(channel, buffer, request);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Response response = new Response(1);
+ AppResponse appResponse = new AppResponse(i);
+ response.setResult(appResponse);
+ dubboCountCodec.encode(channel, buffer, response);
+ }
+
+ MultiMessage multiMessage = (MultiMessage) dubboCountCodec.decode(channel, buffer);
+ Assertions.assertEquals(multiMessage.size(), 20);
+ int requestCount = 0;
+ int responseCount = 0;
+ Iterator iterator = multiMessage.iterator();
+ while (iterator.hasNext()) {
+ Object result = iterator.next();
+ if (result instanceof Request) {
+ requestCount++;
+ Object bytes = ((RpcInvocation) ((Request) result).getData()).getObjectAttachment(INPUT_KEY);
+ Assertions.assertNotNull(bytes);
+ } else if (result instanceof Response) {
+ responseCount++;
+ Object bytes = ((AppResponse) ((Response) result).getResult()).getObjectAttachment(OUTPUT_KEY);
+ Assertions.assertNotNull(bytes);
+ }
+ }
+ Assertions.assertEquals(requestCount, 10);
+ Assertions.assertEquals(responseCount, 10);
+ }
+
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ServerStatusCheckerTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ServerStatusCheckerTest.java
new file mode 100644
index 0000000..36184a0
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ServerStatusCheckerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.status;
+
+import org.apache.dubbo.common.status.Status;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.rpc.ProtocolServer;
+import org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol;
+import org.apache.dubbo.rpc.protocol.dubbo.decode.MockChannel;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * {@link ServerStatusChecker}
+ */
+public class ServerStatusCheckerTest {
+
+ @Test
+ public void test() {
+ ServerStatusChecker serverStatusChecker = new ServerStatusChecker();
+ Status status = serverStatusChecker.check();
+ Assertions.assertEquals(status.getLevel(), Status.Level.UNKNOWN);
+
+ DubboProtocol dubboProtocol = Mockito.mock(DubboProtocol.class);
+ ProtocolServer protocolServer = Mockito.mock(ProtocolServer.class);
+ RemotingServer remotingServer = Mockito.mock(RemotingServer.class);
+ List<ProtocolServer> servers = Arrays.asList(protocolServer);
+ Mockito.when(dubboProtocol.getServers()).thenReturn(servers);
+ Mockito.when(protocolServer.getRemotingServer()).thenReturn(remotingServer);
+ Mockito.when(remotingServer.isBound()).thenReturn(true);
+ Mockito.when(remotingServer.getLocalAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 9999));
+ Mockito.when(remotingServer.getChannels()).thenReturn(Arrays.asList(new MockChannel()));
+
+
+ try (MockedStatic<DubboProtocol> mockDubboProtocol = Mockito.mockStatic(DubboProtocol.class)) {
+ mockDubboProtocol.when(() -> DubboProtocol.getDubboProtocol()).thenReturn(dubboProtocol);
+ status = serverStatusChecker.check();
+ Assertions.assertEquals(status.getLevel(), Status.Level.OK);
+ Assertions.assertEquals(status.getMessage(), "127.0.0.1:9999(clients:1)");
+
+ Mockito.when(remotingServer.isBound()).thenReturn(false);
+ status = serverStatusChecker.check();
+ Assertions.assertEquals(status.getLevel(), Status.Level.ERROR);
+ Assertions.assertEquals(status.getMessage(), "127.0.0.1:9999");
+ }
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusCheckerTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusCheckerTest.java
new file mode 100644
index 0000000..b21a511
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/status/ThreadPoolStatusCheckerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.status;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.status.Status;
+import org.apache.dubbo.common.store.DataStore;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * {@link ThreadPoolStatusChecker}
+ */
+public class ThreadPoolStatusCheckerTest {
+
+ @Test
+ public void test() {
+ DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
+ ExecutorService executorService1 = Executors.newFixedThreadPool(1);
+ ExecutorService executorService2 = Executors.newFixedThreadPool(10);
+ dataStore.put(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY, "8888", executorService1);
+ dataStore.put(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY, "8889", executorService2);
+
+ ThreadPoolStatusChecker threadPoolStatusChecker = new ThreadPoolStatusChecker();
+ Status status = threadPoolStatusChecker.check();
+ Assertions.assertEquals(status.getLevel(), Status.Level.WARN);
+ Assertions.assertEquals(status.getMessage(),
+ "Pool status:WARN, max:1, core:1, largest:0, active:0, task:0, service port: 8888;"
+ + "Pool status:OK, max:10, core:10, largest:0, active:0, task:0, service port: 8889");
+
+ // reset
+ executorService1.shutdown();
+ executorService2.shutdown();
+ dataStore.remove(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY, "8888");
+ dataStore.remove(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY, "8889");
+ }
+}