You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/03/23 04:53:27 UTC

[dubbo] branch master updated: Tuning invoker operation in RegistryDirectory (#7324)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 576b3ab  Tuning invoker operation in RegistryDirectory (#7324)
576b3ab is described below

commit 576b3abbe4ad73ab7aad666ab405691b7aff707e
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Mar 23 12:53:13 2021 +0800

    Tuning invoker operation in RegistryDirectory (#7324)
---
 .../main/java/org/apache/dubbo/common/Version.java   |  7 ++++++-
 .../store/InMemoryWritableMetadataService.java       |  7 ++++++-
 .../registry/integration/RegistryDirectory.java      | 19 +++++++++++--------
 .../dubbo/remoting/transport/AbstractServer.java     |  3 +--
 .../dubbo/remoting/transport/netty4/NettyServer.java | 20 +++++++++++---------
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java      |  2 +-
 .../protocol/dubbo/LazyConnectExchangeClient.java    |  4 ++--
 .../protocol/dubbo/ReferenceCountExchangeClient.java |  2 +-
 8 files changed, 39 insertions(+), 25 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java
index b949e6d..ddbed77 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java
@@ -181,7 +181,12 @@ public final class Version {
                 return defaultVersion;
             }
 
-            String file = codeSource.getLocation().getFile();
+            URL location = codeSource.getLocation();
+            if (location == null){
+                logger.info("No location for class " + cls.getName() + " when getVersion, use default version " + defaultVersion);
+                return defaultVersion;
+            }
+            String file =  location.getFile();
             if (!StringUtils.isEmpty(file) && file.endsWith(".jar")) {
                 version = getFromFile(file);
             }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
index a4b69e7..2c018d6 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
@@ -48,8 +48,10 @@ import java.util.concurrent.locks.ReentrantLock;
 import static java.util.Collections.emptySortedSet;
 import static java.util.Collections.unmodifiableSortedSet;
 import static org.apache.dubbo.common.URL.buildKey;
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
 import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
 
@@ -165,7 +167,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
     @Override
     public void publishServiceDefinition(URL providerUrl) {
         try {
-            if(!ProtocolUtils.isGeneric(providerUrl.getParameter(GENERIC_KEY))){
+            if (!ProtocolUtils.isGeneric(providerUrl.getParameter(GENERIC_KEY))) {
                 String interfaceName = providerUrl.getParameter(INTERFACE_KEY);
                 if (StringUtils.isNotEmpty(interfaceName)) {
                     Class interfaceClass = Class.forName(interfaceName);
@@ -176,6 +178,9 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
                     return;
                 }
                 logger.error("publishProvider interfaceName is empty . providerUrl: " + providerUrl.toFullString());
+            } else if (CONSUMER_SIDE.equalsIgnoreCase(providerUrl.getParameter(SIDE_KEY))) {
+                //to avoid consumer generic invoke style error
+                return;
             }
         } catch (ClassNotFoundException e) {
             //ignore error
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 4c65c6a..ba47904 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -43,7 +43,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.protocol.InvokerWrapper;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,6 +51,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
@@ -291,7 +291,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
      * @return invokers
      */
     private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
-        Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+        Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>();
         if (urls == null || urls.isEmpty()) {
             return newUrlInvokerMap;
         }
@@ -450,9 +450,8 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
         // check deleted invoker
         List<URL> deleted = null;
         if (oldUrlInvokerMap != null) {
-            Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
             for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
-                if (!newInvokers.contains(entry.getValue())) {
+                if (!newUrlInvokerMap.containsKey(entry.getKey())) {
                     if (deleted == null) {
                         deleted = new ArrayList<>();
                     }
@@ -464,7 +463,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
         if (deleted != null) {
             for (URL url : deleted) {
                 if (url != null) {
-                    Invoker<T> invoker = oldUrlInvokerMap.remove(url);
+                    Invoker<T> invoker = oldUrlInvokerMap.get(url);
                     if (invoker != null) {
                         try {
                             invoker.destroy();
@@ -541,10 +540,14 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
         }
         Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
         if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
-            for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
-                if (invoker.isAvailable()) {
-                    return true;
+            try {
+                for (Map.Entry<URL,Invoker<T>> entry : localUrlInvokerMap.entrySet()){
+                    if (entry.getValue().isAvailable()) {
+                        return true;
+                    }
                 }
+            }catch (Throwable throwable){
+                return true;
             }
         }
         return false;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index ef448db..967d8d2 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -172,8 +172,7 @@ public abstract class AbstractServer extends AbstractEndpoint implements Remotin
             return;
         }
 
-        Collection<Channel> channels = getChannels();
-        if (accepts > 0 && channels.size() > accepts) {
+        if (accepts > 0 && getChannels().size() > accepts) {
             logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
             ch.close();
             return;
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
index 5781f7a..f9369fd 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
@@ -40,8 +40,8 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -171,14 +171,16 @@ public class NettyServer extends AbstractServer implements RemotingServer {
 
     @Override
     public Collection<Channel> getChannels() {
-        Collection<Channel> chs = new HashSet<Channel>();
-        for (Channel channel : this.channels.values()) {
-            if (channel.isConnected()) {
-                chs.add(channel);
-            } else {
-                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
-            }
-        }
+        Collection<Channel> chs = new ArrayList<>(this.channels.size());
+        chs.addAll(this.channels.values());
+        // check of connection status is unnecessary since we are using channels in NettyServerHandler
+//        for (Channel channel : this.channels.values()) {
+//            if (channel.isConnected()) {
+//                chs.add(channel);
+//            } else {
+//                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
+//            }
+//        }
         return chs;
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 74115be..5a6ff12 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -509,7 +509,7 @@ public class DubboProtocol extends AbstractProtocol {
 
         for (ReferenceCountExchangeClient referenceCountExchangeClient : referenceCountExchangeClients) {
             // As long as one client is not available, you need to replace the unavailable client with the available one.
-            if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
+            if (referenceCountExchangeClient == null || referenceCountExchangeClient.getCount() <= 0 || referenceCountExchangeClient.isClosed()) {
                 return false;
             }
         }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index 8af87c8..2947e7e 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -35,8 +35,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE;
+import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 
 /**
  * dubbo protocol support class.
@@ -135,7 +135,7 @@ final class LazyConnectExchangeClient implements ExchangeClient {
     private void warning() {
         if (requestWithWarning) {
             if (warningcount.get() % warning_period == 0) {
-                logger.warn(new IllegalStateException("safe guard client , should not be called ,must have a bug."));
+                logger.warn(url.getAddress() + " " + url.getServiceKey() + " safe guard client , should not be called ,must have a bug.");
             }
             warningcount.incrementAndGet();
         }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index 22b99ea..ef3e00f 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -204,7 +204,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
         referenceCount.incrementAndGet();
     }
 
-    public int getCount(){
+    public int getCount() {
         return referenceCount.get();
     }
 }