You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/31 02:25:25 UTC
[dubbo] branch 3.0-multi-instances updated: Add Callback Service
build ProviderModel
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0-multi-instances
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0-multi-instances by this push:
new a33b08e Add Callback Service build ProviderModel
a33b08e is described below
commit a33b08e56aafbbb74b0cac7641371512bf1e90e6
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Tue Aug 31 10:25:06 2021 +0800
Add Callback Service build ProviderModel
---
.../org/apache/dubbo/rpc/model/ConsumerModel.java | 5 +++--
.../org/apache/dubbo/rpc/model/ProviderModel.java | 5 +++--
.../org/apache/dubbo/rpc/model/ServiceModel.java | 15 ++++++++++----
.../apache/dubbo/rpc/model/ServiceRepository.java | 6 +++---
.../org/apache/dubbo/config/ReferenceConfig.java | 17 +++++++++++++++-
.../org/apache/dubbo/config/ServiceConfig.java | 15 ++++++++++++++
.../rpc/protocol/dubbo/CallbackServiceCodec.java | 23 ++++++++++++++++++----
7 files changed, 70 insertions(+), 16 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
index f6df44d..3158250 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
@@ -52,7 +52,7 @@ public class ConsumerModel extends ServiceModel {
ReferenceConfigBase<?> referenceConfig,
Map<String, AsyncMethodInfo> methodConfigs) {
- super(proxyObject, serviceKey, serviceModel, referenceConfig);
+ super(proxyObject, serviceKey, serviceModel, ApplicationModel.defaultModel().getDefaultModule(), referenceConfig);
Assert.notEmptyString(serviceKey, "Service name can't be null or blank");
this.methodConfigs = methodConfigs == null ? new HashMap<>() : methodConfigs;
@@ -63,9 +63,10 @@ public class ConsumerModel extends ServiceModel {
ServiceDescriptor serviceModel,
ReferenceConfigBase<?> referenceConfig,
ServiceMetadata metadata,
+ ModuleModel moduleModel,
Map<String, AsyncMethodInfo> methodConfigs) {
- super(proxyObject, serviceKey, serviceModel, referenceConfig, metadata);
+ super(proxyObject, serviceKey, serviceModel, moduleModel, referenceConfig, metadata);
Assert.notEmptyString(serviceKey, "Service name can't be null or blank");
this.methodConfigs = methodConfigs == null ? new HashMap<>() : methodConfigs;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
index 2aab894..2c506eb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
@@ -38,7 +38,7 @@ public class ProviderModel extends ServiceModel {
Object serviceInstance,
ServiceDescriptor serviceModel,
ServiceConfigBase<?> serviceConfig) {
- super(serviceInstance, serviceKey, serviceModel, serviceConfig);
+ super(serviceInstance, serviceKey, serviceModel, ApplicationModel.defaultModel().getDefaultModule(), serviceConfig);
if (null == serviceInstance) {
throw new IllegalArgumentException("Service[" + serviceKey + "]Target is NULL.");
}
@@ -49,9 +49,10 @@ public class ProviderModel extends ServiceModel {
public ProviderModel(String serviceKey,
Object serviceInstance,
ServiceDescriptor serviceModel,
+ ModuleModel moduleModel,
ServiceConfigBase<?> serviceConfig,
ServiceMetadata serviceMetadata) {
- super(serviceInstance, serviceKey, serviceModel, serviceConfig, serviceMetadata);
+ super(serviceInstance, serviceKey, serviceModel, moduleModel, serviceConfig, serviceMetadata);
if (null == serviceInstance) {
throw new IllegalArgumentException("Service[" + serviceKey + "]Target is NULL.");
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
index 3bba258..c70d43f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
@@ -27,22 +27,25 @@ import java.util.Set;
public class ServiceModel {
private String serviceKey;
private Object proxyObject;
+ private final ModuleModel moduleModel;
private final ServiceDescriptor serviceModel;
private final AbstractInterfaceConfig config;
private ServiceMetadata serviceMetadata;
- public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, AbstractInterfaceConfig config) {
+ public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, ModuleModel moduleModel, AbstractInterfaceConfig config) {
this.proxyObject = proxyObject;
this.serviceKey = serviceKey;
this.serviceModel = serviceModel;
+ this.moduleModel = moduleModel;
this.config = config;
}
- public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, AbstractInterfaceConfig config, ServiceMetadata serviceMetadata) {
+ public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, ModuleModel moduleModel, AbstractInterfaceConfig config, ServiceMetadata serviceMetadata) {
this.proxyObject = proxyObject;
this.serviceKey = serviceKey;
this.serviceModel = serviceModel;
+ this.moduleModel = moduleModel;
this.config = config;
this.serviceMetadata = serviceMetadata;
}
@@ -86,7 +89,7 @@ public class ServiceModel {
}
public ReferenceConfigBase<?> getReferenceConfig() {
- if(config instanceof ReferenceConfigBase) {
+ if (config instanceof ReferenceConfigBase) {
return (ReferenceConfigBase<?>) config;
} else {
throw new IllegalArgumentException("Current ServiceModel is not a ConsumerModel");
@@ -94,7 +97,7 @@ public class ServiceModel {
}
public ServiceConfigBase<?> getServiceConfig() {
- if(config instanceof ServiceConfigBase) {
+ if (config instanceof ServiceConfigBase) {
return (ServiceConfigBase<?>) config;
} else {
throw new IllegalArgumentException("Current ServiceModel is not a ProviderModel");
@@ -119,4 +122,8 @@ public class ServiceModel {
public ServiceMetadata getServiceMetadata() {
return serviceMetadata;
}
+
+ public ModuleModel getModuleModel() {
+ return moduleModel;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
index 7207f1d..9b93c7b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
@@ -111,7 +111,7 @@ public class ServiceRepository extends LifecycleAdapter implements FrameworkExt,
Object proxy,
ServiceMetadata serviceMetadata) {
ConsumerModel consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, rc,
- serviceMetadata, null);
+ serviceMetadata, ApplicationModel.defaultModel().getDefaultModule(), null);
consumers.putIfAbsent(serviceKey, consumerModel);
}
@@ -132,8 +132,8 @@ public class ServiceRepository extends LifecycleAdapter implements FrameworkExt,
ServiceDescriptor serviceModel,
ServiceConfigBase<?> serviceConfig,
ServiceMetadata serviceMetadata) {
- ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, serviceConfig,
- serviceMetadata);
+ ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, ApplicationModel.defaultModel().getDefaultModule(),
+ serviceConfig, serviceMetadata);
providers.putIfAbsent(serviceKey, providerModel);
providersWithoutGroup.putIfAbsent(keyWithoutGroup(serviceKey), providerModel);
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 171745f..01e89af 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -40,8 +40,11 @@ import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.AsyncMethodInfo;
import org.apache.dubbo.rpc.model.ConsumerModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.ServiceRepository;
import org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol;
@@ -260,7 +263,7 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
ServiceRepository repository = getApplicationModel().getApplicationServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
- serviceMetadata, createAsyncMethodInfo());
+ serviceMetadata, getScopeModel(), createAsyncMethodInfo());
repository.registerConsumer(consumerModel);
@@ -279,6 +282,18 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
checkInvokerAvailable();
}
+ @Override
+ public ModuleModel getScopeModel() {
+ ScopeModel scopeModel = super.getScopeModel();
+ if (scopeModel instanceof ApplicationModel) {
+ return ((ApplicationModel) scopeModel).getDefaultModule();
+ } else if (scopeModel instanceof ModuleModel) {
+ return (ModuleModel) scopeModel;
+ } else {
+ throw new IllegalStateException("scope model is invalid: " + scopeModel);
+ }
+ }
+
/**
* convert and aggregate async method info
*
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 8e5213d..e287452 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -43,7 +43,10 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.cluster.ConfiguratorFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.ServiceRepository;
import org.apache.dubbo.rpc.service.GenericService;
@@ -361,6 +364,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
providerModel = new ProviderModel(getUniqueServiceName(),
ref,
serviceDescriptor,
+ getScopeModel(),
this,
serviceMetadata);
@@ -378,6 +382,17 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
}
+ @Override
+ public ModuleModel getScopeModel() {
+ ScopeModel scopeModel = super.getScopeModel();
+ if (scopeModel instanceof ApplicationModel) {
+ return ((ApplicationModel) scopeModel).getDefaultModule();
+ } else if (scopeModel instanceof ModuleModel) {
+ return (ModuleModel) scopeModel;
+ } else {
+ throw new IllegalStateException("scope model is invalid: " + scopeModel);
+ }
+ }
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
Map<String, String> map = buildAttributes(protocolConfig);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index deaeec9..7386d0f 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo;
+import org.apache.dubbo.common.BaseServiceMetadata;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.bytecode.Wrapper;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -33,7 +34,11 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.ServiceMetadata;
import java.io.IOException;
import java.util.HashMap;
@@ -93,7 +98,7 @@ class CallbackServiceCodec {
* @throws IOException
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
+ private static String exportOrUnexportCallbackService(Channel channel, RpcInvocation inv, URL url, Class clazz, Object inst, Boolean export) throws IOException {
int instid = System.identityHashCode(inst);
Map<String, String> params = new HashMap<>(3);
@@ -129,7 +134,17 @@ class CallbackServiceCodec {
// one channel can have multiple callback instances, no need to re-export for different instance.
if (!channel.hasAttribute(cacheKey)) {
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
- ScopeModelUtil.getApplicationModel(url.getScopeModel()).getApplicationServiceRepository().registerService(clazz);
+ ModuleModel moduleModel = inv.getServiceModel().getModuleModel();
+ ServiceDescriptor serviceDescriptor = ScopeModelUtil.getApplicationModel(moduleModel).getApplicationServiceRepository().registerService(clazz);
+ ProviderModel providerModel = new ProviderModel(BaseServiceMetadata.buildServiceKey(exportUrl.getPath(), group, exportUrl.getVersion()),
+ inst,
+ serviceDescriptor,
+ moduleModel,
+ null,
+ new ServiceMetadata(clazz.getName() + "." + instid, exportUrl.getGroup(), exportUrl.getVersion(), clazz));
+ moduleModel.getApplicationModel().getApplicationServiceRepository().registerProvider(providerModel);
+ exportUrl = exportUrl.setScopeModel(moduleModel);
+ exportUrl = exportUrl.setServiceModel(providerModel);
Invoker<?> invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl);
// should destroy resource?
Exporter<?> exporter = PROTOCOL.export(invoker);
@@ -276,10 +291,10 @@ class CallbackServiceCodec {
Class<?>[] pts = inv.getParameterTypes();
switch (callbackStatus) {
case CallbackServiceCodec.CALLBACK_CREATE:
- inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
+ inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv, url, pts[paraIndex], args[paraIndex], true));
return null;
case CallbackServiceCodec.CALLBACK_DESTROY:
- inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
+ inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv, url, pts[paraIndex], args[paraIndex], false));
return null;
default:
return args[paraIndex];