You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/31 02:25:25 UTC

[dubbo] branch 3.0-multi-instances updated: Add Callback Service build ProviderModel

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0-multi-instances
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0-multi-instances by this push:
     new a33b08e  Add Callback Service build ProviderModel
a33b08e is described below

commit a33b08e56aafbbb74b0cac7641371512bf1e90e6
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Tue Aug 31 10:25:06 2021 +0800

    Add Callback Service build ProviderModel
---
 .../org/apache/dubbo/rpc/model/ConsumerModel.java  |  5 +++--
 .../org/apache/dubbo/rpc/model/ProviderModel.java  |  5 +++--
 .../org/apache/dubbo/rpc/model/ServiceModel.java   | 15 ++++++++++----
 .../apache/dubbo/rpc/model/ServiceRepository.java  |  6 +++---
 .../org/apache/dubbo/config/ReferenceConfig.java   | 17 +++++++++++++++-
 .../org/apache/dubbo/config/ServiceConfig.java     | 15 ++++++++++++++
 .../rpc/protocol/dubbo/CallbackServiceCodec.java   | 23 ++++++++++++++++++----
 7 files changed, 70 insertions(+), 16 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
index f6df44d..3158250 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
@@ -52,7 +52,7 @@ public class ConsumerModel extends ServiceModel {
                          ReferenceConfigBase<?> referenceConfig,
                          Map<String, AsyncMethodInfo> methodConfigs) {
 
-        super(proxyObject, serviceKey, serviceModel, referenceConfig);
+        super(proxyObject, serviceKey, serviceModel, ApplicationModel.defaultModel().getDefaultModule(), referenceConfig);
         Assert.notEmptyString(serviceKey, "Service name can't be null or blank");
 
         this.methodConfigs = methodConfigs == null ? new HashMap<>() : methodConfigs;
@@ -63,9 +63,10 @@ public class ConsumerModel extends ServiceModel {
                          ServiceDescriptor serviceModel,
                          ReferenceConfigBase<?> referenceConfig,
                          ServiceMetadata metadata,
+                         ModuleModel moduleModel,
                          Map<String, AsyncMethodInfo> methodConfigs) {
 
-        super(proxyObject, serviceKey, serviceModel, referenceConfig, metadata);
+        super(proxyObject, serviceKey, serviceModel, moduleModel, referenceConfig, metadata);
         Assert.notEmptyString(serviceKey, "Service name can't be null or blank");
 
         this.methodConfigs = methodConfigs == null ? new HashMap<>() : methodConfigs;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
index 2aab894..2c506eb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ProviderModel.java
@@ -38,7 +38,7 @@ public class ProviderModel extends ServiceModel {
                          Object serviceInstance,
                          ServiceDescriptor serviceModel,
                          ServiceConfigBase<?> serviceConfig) {
-        super(serviceInstance, serviceKey, serviceModel, serviceConfig);
+        super(serviceInstance, serviceKey, serviceModel, ApplicationModel.defaultModel().getDefaultModule(), serviceConfig);
         if (null == serviceInstance) {
             throw new IllegalArgumentException("Service[" + serviceKey + "]Target is NULL.");
         }
@@ -49,9 +49,10 @@ public class ProviderModel extends ServiceModel {
     public ProviderModel(String serviceKey,
                          Object serviceInstance,
                          ServiceDescriptor serviceModel,
+                         ModuleModel moduleModel,
                          ServiceConfigBase<?> serviceConfig,
                          ServiceMetadata serviceMetadata) {
-        super(serviceInstance, serviceKey, serviceModel, serviceConfig, serviceMetadata);
+        super(serviceInstance, serviceKey, serviceModel, moduleModel, serviceConfig, serviceMetadata);
         if (null == serviceInstance) {
             throw new IllegalArgumentException("Service[" + serviceKey + "]Target is NULL.");
         }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
index 3bba258..c70d43f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceModel.java
@@ -27,22 +27,25 @@ import java.util.Set;
 public class ServiceModel {
     private String serviceKey;
     private Object proxyObject;
+    private final ModuleModel moduleModel;
     private final ServiceDescriptor serviceModel;
     private final AbstractInterfaceConfig config;
 
     private ServiceMetadata serviceMetadata;
 
-    public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, AbstractInterfaceConfig config) {
+    public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, ModuleModel moduleModel, AbstractInterfaceConfig config) {
         this.proxyObject = proxyObject;
         this.serviceKey = serviceKey;
         this.serviceModel = serviceModel;
+        this.moduleModel = moduleModel;
         this.config = config;
     }
 
-    public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, AbstractInterfaceConfig config, ServiceMetadata serviceMetadata) {
+    public ServiceModel(Object proxyObject, String serviceKey, ServiceDescriptor serviceModel, ModuleModel moduleModel, AbstractInterfaceConfig config, ServiceMetadata serviceMetadata) {
         this.proxyObject = proxyObject;
         this.serviceKey = serviceKey;
         this.serviceModel = serviceModel;
+        this.moduleModel = moduleModel;
         this.config = config;
         this.serviceMetadata = serviceMetadata;
     }
@@ -86,7 +89,7 @@ public class ServiceModel {
     }
 
     public ReferenceConfigBase<?> getReferenceConfig() {
-        if(config instanceof ReferenceConfigBase) {
+        if (config instanceof ReferenceConfigBase) {
             return (ReferenceConfigBase<?>) config;
         } else {
             throw new IllegalArgumentException("Current ServiceModel is not a ConsumerModel");
@@ -94,7 +97,7 @@ public class ServiceModel {
     }
 
     public ServiceConfigBase<?> getServiceConfig() {
-        if(config instanceof ServiceConfigBase) {
+        if (config instanceof ServiceConfigBase) {
             return (ServiceConfigBase<?>) config;
         } else {
             throw new IllegalArgumentException("Current ServiceModel is not a ProviderModel");
@@ -119,4 +122,8 @@ public class ServiceModel {
     public ServiceMetadata getServiceMetadata() {
         return serviceMetadata;
     }
+
+    public ModuleModel getModuleModel() {
+        return moduleModel;
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
index 7207f1d..9b93c7b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java
@@ -111,7 +111,7 @@ public class ServiceRepository extends LifecycleAdapter implements FrameworkExt,
                                  Object proxy,
                                  ServiceMetadata serviceMetadata) {
         ConsumerModel consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, rc,
-            serviceMetadata, null);
+            serviceMetadata, ApplicationModel.defaultModel().getDefaultModule(), null);
         consumers.putIfAbsent(serviceKey, consumerModel);
     }
 
@@ -132,8 +132,8 @@ public class ServiceRepository extends LifecycleAdapter implements FrameworkExt,
                                  ServiceDescriptor serviceModel,
                                  ServiceConfigBase<?> serviceConfig,
                                  ServiceMetadata serviceMetadata) {
-        ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, serviceConfig,
-            serviceMetadata);
+        ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, ApplicationModel.defaultModel().getDefaultModule(),
+            serviceConfig, serviceMetadata);
         providers.putIfAbsent(serviceKey, providerModel);
         providersWithoutGroup.putIfAbsent(keyWithoutGroup(serviceKey), providerModel);
     }
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 171745f..01e89af 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -40,8 +40,11 @@ import org.apache.dubbo.rpc.cluster.Cluster;
 import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
 import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
 import org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.AsyncMethodInfo;
 import org.apache.dubbo.rpc.model.ConsumerModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ScopeModel;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.model.ServiceRepository;
 import org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol;
@@ -260,7 +263,7 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
         ServiceRepository repository = getApplicationModel().getApplicationServiceRepository();
         ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
         consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
-            serviceMetadata, createAsyncMethodInfo());
+            serviceMetadata, getScopeModel(), createAsyncMethodInfo());
 
         repository.registerConsumer(consumerModel);
 
@@ -279,6 +282,18 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
         checkInvokerAvailable();
     }
 
+    @Override
+    public ModuleModel getScopeModel() {
+        ScopeModel scopeModel = super.getScopeModel();
+        if (scopeModel instanceof ApplicationModel) {
+            return ((ApplicationModel) scopeModel).getDefaultModule();
+        } else if (scopeModel instanceof ModuleModel) {
+            return (ModuleModel) scopeModel;
+        } else {
+            throw new IllegalStateException("scope model is invalid: " + scopeModel);
+        }
+    }
+
     /**
      * convert and aggregate async method info
      *
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 8e5213d..e287452 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -43,7 +43,10 @@ import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.ProxyFactory;
 import org.apache.dubbo.rpc.cluster.ConfiguratorFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ScopeModel;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.model.ServiceRepository;
 import org.apache.dubbo.rpc.service.GenericService;
@@ -361,6 +364,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
         providerModel = new ProviderModel(getUniqueServiceName(),
             ref,
             serviceDescriptor,
+            getScopeModel(),
             this,
             serviceMetadata);
 
@@ -378,6 +382,17 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
         }
     }
 
+    @Override
+    public ModuleModel getScopeModel() {
+        ScopeModel scopeModel = super.getScopeModel();
+        if (scopeModel instanceof ApplicationModel) {
+            return ((ApplicationModel) scopeModel).getDefaultModule();
+        } else if (scopeModel instanceof ModuleModel) {
+            return (ModuleModel) scopeModel;
+        } else {
+            throw new IllegalStateException("scope model is invalid: " + scopeModel);
+        }
+    }
 
     private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
         Map<String, String> map = buildAttributes(protocolConfig);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index deaeec9..7386d0f 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.dubbo;
 
+import org.apache.dubbo.common.BaseServiceMetadata;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.bytecode.Wrapper;
 import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -33,7 +34,11 @@ import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.ProxyFactory;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
 import org.apache.dubbo.rpc.model.ScopeModelUtil;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.ServiceMetadata;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -93,7 +98,7 @@ class CallbackServiceCodec {
      * @throws IOException
      */
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
+    private static String exportOrUnexportCallbackService(Channel channel, RpcInvocation inv, URL url, Class clazz, Object inst, Boolean export) throws IOException {
         int instid = System.identityHashCode(inst);
 
         Map<String, String> params = new HashMap<>(3);
@@ -129,7 +134,17 @@ class CallbackServiceCodec {
             // one channel can have multiple callback instances, no need to re-export for different instance.
             if (!channel.hasAttribute(cacheKey)) {
                 if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
-                    ScopeModelUtil.getApplicationModel(url.getScopeModel()).getApplicationServiceRepository().registerService(clazz);
+                    ModuleModel moduleModel = inv.getServiceModel().getModuleModel();
+                    ServiceDescriptor serviceDescriptor = ScopeModelUtil.getApplicationModel(moduleModel).getApplicationServiceRepository().registerService(clazz);
+                    ProviderModel providerModel = new ProviderModel(BaseServiceMetadata.buildServiceKey(exportUrl.getPath(), group, exportUrl.getVersion()),
+                        inst,
+                        serviceDescriptor,
+                        moduleModel,
+                        null,
+                        new ServiceMetadata(clazz.getName() + "." + instid, exportUrl.getGroup(), exportUrl.getVersion(), clazz));
+                    moduleModel.getApplicationModel().getApplicationServiceRepository().registerProvider(providerModel);
+                    exportUrl = exportUrl.setScopeModel(moduleModel);
+                    exportUrl = exportUrl.setServiceModel(providerModel);
                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl);
                     // should destroy resource?
                     Exporter<?> exporter = PROTOCOL.export(invoker);
@@ -276,10 +291,10 @@ class CallbackServiceCodec {
         Class<?>[] pts = inv.getParameterTypes();
         switch (callbackStatus) {
             case CallbackServiceCodec.CALLBACK_CREATE:
-                inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
+                inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv,  url, pts[paraIndex], args[paraIndex], true));
                 return null;
             case CallbackServiceCodec.CALLBACK_DESTROY:
-                inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
+                inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv,  url, pts[paraIndex], args[paraIndex], false));
                 return null;
             default:
                 return args[paraIndex];