You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/17 03:43:09 UTC
[dubbo] branch 3.0 updated: Adapt Triple Protocol to
Multi-Instances (#8829)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 87a03cc Adapt Triple Protocol to Multi-Instances (#8829)
87a03cc is described below
commit 87a03cccdfcd732294762800ac911f940970b712
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Fri Sep 17 11:42:49 2021 +0800
Adapt Triple Protocol to Multi-Instances (#8829)
* Adapt Triple Protocol to Multi-Instances
* opt import
---
.../rpc/protocol/tri/AbstractServerStream.java | 8 +-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 5 +-
.../rpc/protocol/tri/TripleClientHandler.java | 2 +-
.../dubbo/rpc/protocol/tri/TripleConstant.java | 3 +-
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 6 +-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 22 +++---
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 17 +++--
.../protocol/tri/service/TriBuiltinService.java | 87 ++++++++++++----------
.../rpc/protocol/tri/service/TriHealthImpl.java | 5 +-
9 files changed, 82 insertions(+), 73 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index f2309ef..9c8bd0e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.HeaderFilter;
@@ -45,8 +44,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KE
public abstract class AbstractServerStream extends AbstractStream implements Stream {
- protected static final ExecutorRepository EXECUTOR_REPOSITORY =
- ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
private final ProviderModel providerModel;
private List<MethodDescriptor> methodDescriptors;
private Invoker<?> invoker;
@@ -73,11 +70,12 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
executor = (ExecutorService) providerModel.getServiceMetadata()
.getAttribute(CommonConstants.THREADPOOL_KEY);
}
+ ExecutorRepository executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
if (executor == null) {
- executor = EXECUTOR_REPOSITORY.getExecutor(url);
+ executor = executorRepository.getExecutor(url);
}
if (executor == null) {
- executor = EXECUTOR_REPOSITORY.createExecutorIfAbsent(url);
+ executor = executorRepository.createExecutorIfAbsent(url);
}
return executor;
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 822d655..4ea0d45 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
@@ -83,7 +82,7 @@ public abstract class AbstractStream implements Stream {
this.url = url;
this.executor = executor;
final String value = url.getParameter(Constants.MULTI_SERIALIZATION_KEY, CommonConstants.DEFAULT_KEY);
- this.multipleSerialization = ExtensionLoader.getExtensionLoader(MultipleSerialization.class)
+ this.multipleSerialization = url.getOrDefaultFrameworkModel().getExtensionLoader(MultipleSerialization.class)
.getExtension(value);
this.streamObserver = createStreamObserver();
this.transportObserver = createTransportObserver();
@@ -130,7 +129,7 @@ public abstract class AbstractStream implements Stream {
}
public AbstractStream serialize(String serializeType) {
- if (serializeType.equals("hessian4")) {
+ if ("hessian4".equals(serializeType)) {
serializeType = "hessian2";
}
this.serializeType = serializeType;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 68b7ea1..0b9d45d 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -45,7 +45,7 @@ import java.util.List;
public class TripleClientHandler extends ChannelDuplexHandler {
- private FrameworkModel frameworkModel;
+ private final FrameworkModel frameworkModel;
public TripleClientHandler(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 7943012..586efab 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -16,10 +16,11 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.constants.CommonConstants;
+
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
-import org.apache.dubbo.common.constants.CommonConstants;
public interface TripleConstant {
String CONTENT_PROTO = "application/grpc+proto";
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index da25ae0..4df7370 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -64,8 +63,6 @@ import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
*/
public class TripleInvoker<T> extends AbstractInvoker<T> {
- private static final ConnectionManager CONNECTION_MANAGER = ExtensionLoader.getExtensionLoader(
- ConnectionManager.class).getExtension("multiple");
private final Connection connection;
private final ReentrantLock destroyLock = new ReentrantLock();
@@ -74,7 +71,8 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
public TripleInvoker(Class<T> serviceType, URL url, Set<Invoker<?>> invokers) throws RemotingException {
super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
this.invokers = invokers;
- this.connection = CONNECTION_MANAGER.connect(url);
+ ConnectionManager connectionManager = url.getOrDefaultFrameworkModel().getExtensionLoader(ConnectionManager.class).getExtension("multiple");
+ this.connection = connectionManager.connect(url);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 5b86902..8a5a6c8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -16,9 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import grpc.health.v1.HealthCheckResponse;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
@@ -29,10 +27,13 @@ import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.AbstractExporter;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
+import grpc.health.v1.HealthCheckResponse;
+
import java.util.ArrayList;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
@@ -44,13 +45,12 @@ import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
public class TripleProtocol extends AbstractProtocol implements Protocol {
private static final Logger logger = LoggerFactory.getLogger(TripleProtocol.class);
- private final PathResolver pathResolver = ExtensionLoader.getExtensionLoader(PathResolver.class)
- .getDefaultExtension();
- private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension();
+ private final PathResolver pathResolver;
+ private final TriBuiltinService triBuiltinService;
- static {
- TriBuiltinService.init();
+ public TripleProtocol(FrameworkModel frameworkModel) {
+ this.triBuiltinService = new TriBuiltinService(frameworkModel);
+ this.pathResolver = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
}
@Override
@@ -79,8 +79,8 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
pathResolver.add(url.getServiceInterface(), invoker);
// set service status
- TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
- TriBuiltinService.getHealthStatusManager().setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
+ triBuiltinService.getHealthStatusManager().setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
+ triBuiltinService.getHealthStatusManager().setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
PortUnificationExchanger.bind(invoker.getUrl());
return exporter;
@@ -92,6 +92,8 @@ public class TripleProtocol extends AbstractProtocol implements Protocol {
try {
url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
+ ExecutorRepository executorRepository = url.getOrDefaultApplicationModel()
+ .getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
executorRepository.createExecutorIfAbsent(url);
invoker = new TripleInvoker<>(type, url, invokers);
} catch (RemotingException e) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 373861a..796cc83 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -16,6 +16,13 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.serialize.MultipleSerialization;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.triple.TripleWrapper;
+
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -30,12 +37,6 @@ import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AttributeKey;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.serialize.MultipleSerialization;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.model.MethodDescriptor;
-import org.apache.dubbo.triple.TripleWrapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -340,14 +341,14 @@ public class TripleUtil {
}
public static String convertHessianToWrapper(String serializeType) {
- if (serializeType.equals("hessian2")) {
+ if ("hessian2".equals(serializeType)) {
return "hessian4";
}
return serializeType;
}
public static String convertHessianFromWrapper(String serializeType) {
- if (serializeType.equals("hessian4")) {
+ if ("hessian4".equals(serializeType)) {
return "hessian2";
}
return serializeType;
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
index c897d6d..fff08c7 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
@@ -18,11 +18,12 @@ package org.apache.dubbo.rpc.protocol.tri.service;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ModuleServiceRepository;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
@@ -31,8 +32,7 @@ import org.apache.dubbo.rpc.protocol.tri.PathResolver;
import grpc.health.v1.Health;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
@@ -42,54 +42,63 @@ import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
**/
public class TriBuiltinService {
- private static final ProxyFactory PROXY_FACTORY =
- ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+ private final ProxyFactory proxyFactory;
- private static final PathResolver PATH_RESOLVER = ExtensionLoader.getExtensionLoader(PathResolver.class)
- .getDefaultExtension();
+ private final PathResolver pathResolver;
- private static final ModuleServiceRepository repository = ApplicationModel.defaultModel().getDefaultModule().getServiceRepository();
+ private final ModuleServiceRepository repository;
- private static final Map<Class<?>, Object> TRI_SERVICES = new HashMap<>();
+ private final Health healthService;
- private static final HealthStatusManager HEALTH_STATUS_MANAGER;
+ private final HealthStatusManager healthStatusManager;
- private static final AtomicBoolean init = new AtomicBoolean();
+ private final AtomicBoolean init = new AtomicBoolean();
- static {
- HEALTH_STATUS_MANAGER = new HealthStatusManager(new TriHealthImpl());
- TRI_SERVICES.put(Health.class, HEALTH_STATUS_MANAGER.getHealthService());
+ public TriBuiltinService(FrameworkModel frameworkModel) {
+ healthStatusManager = new HealthStatusManager(new TriHealthImpl());
+ healthService = healthStatusManager.getHealthService();
+ proxyFactory = frameworkModel.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
+ pathResolver = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
+ List<ApplicationModel> applicationModels = frameworkModel.getApplicationModels();
+ if (CollectionUtils.isEmpty(applicationModels)) {
+ throw new IllegalStateException("Should have at least one applicationModel in frameworkModel. FrameworkModel:" + frameworkModel);
+ }
+ repository = applicationModels.get(0).getInternalModule().getServiceRepository();
+ init();
}
- public static void init() {
+ public void init() {
if (init.compareAndSet(false, true)) {
- TRI_SERVICES.forEach((clz, impl) -> {
- ServiceDescriptor serviceDescriptor = repository.registerService(clz);
- ServiceMetadata serviceMetadata = new ServiceMetadata();
- serviceMetadata.setServiceType(clz);
- serviceMetadata.setTarget(impl);
- serviceMetadata.setServiceInterfaceName(clz.getName());
- serviceMetadata.generateServiceKey();
- ProviderModel providerModel = new ProviderModel(
- clz.getName(),
- impl,
- serviceDescriptor,
- null,
- serviceMetadata);
- repository.registerProvider(providerModel);
- int port = 0;
- URL url = new ServiceConfigURL(CommonConstants.TRIPLE, null,
- null, ANYHOST_VALUE, port, clz.getName());
- url.setServiceModel(providerModel);
- url.setScopeModel(ApplicationModel.defaultModel().getInternalModule());
- Invoker<?> invoker = PROXY_FACTORY.getInvoker(impl, (Class) clz, url);
- PATH_RESOLVER.add(url.getServiceKey(), invoker);
- PATH_RESOLVER.add(url.getServiceInterface(), invoker);
+ ServiceDescriptor serviceDescriptor = repository.registerService(Health.class);
+ ServiceMetadata serviceMetadata = new ServiceMetadata();
+ serviceMetadata.setServiceType(Health.class);
+ serviceMetadata.setTarget(healthService);
+ serviceMetadata.setServiceInterfaceName(Health.class.getName());
+ serviceMetadata.generateServiceKey();
+ ProviderModel providerModel = new ProviderModel(
+ Health.class.getName(),
+ healthService,
+ serviceDescriptor,
+ null,
+ serviceMetadata);
+ repository.registerProvider(providerModel);
+ int port = 0;
+ URL url = new ServiceConfigURL(CommonConstants.TRIPLE, null,
+ null, ANYHOST_VALUE, port, Health.class.getName());
+ url.setServiceModel(providerModel);
+ url.setScopeModel(ApplicationModel.defaultModel().getInternalModule());
+ Invoker<?> invoker = proxyFactory.getInvoker(healthService, Health.class, url);
+ pathResolver.add(url.getServiceKey(), invoker);
+ pathResolver.add(url.getServiceInterface(), invoker);
+ providerModel.setDestroyCaller(()->{
+ pathResolver.remove(url.getServiceKey());
+ pathResolver.remove(url.getServiceInterface());
+ return null;
});
}
}
- public static HealthStatusManager getHealthStatusManager() {
- return HEALTH_STATUS_MANAGER;
+ public HealthStatusManager getHealthStatusManager() {
+ return healthStatusManager;
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
index 0d4da5c..c83852f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
@@ -17,11 +17,12 @@
package org.apache.dubbo.rpc.protocol.tri.service;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.rpc.RpcException;
+
import grpc.health.v1.Health;
import grpc.health.v1.HealthCheckRequest;
import grpc.health.v1.HealthCheckResponse;
-import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.rpc.RpcException;
import java.util.HashMap;
import java.util.IdentityHashMap;