You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/11/08 09:24:10 UTC

[dubbo] branch 3.0 updated: [3.0] allocate invokers space at initial time of AbstractDirectory, fix ConnectivityValidationTest (#9211)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8a14fb3  [3.0] allocate invokers space at initial time of AbstractDirectory, fix ConnectivityValidationTest (#9211)
8a14fb3 is described below

commit 8a14fb39304308c0b742dbc3fad60cdcc6595f6a
Author: zrlw <zr...@sina.com>
AuthorDate: Mon Nov 8 17:23:37 2021 +0800

    [3.0] allocate invokers space at initial time of AbstractDirectory, fix ConnectivityValidationTest (#9211)
    
    * allocate space for invokers and validInvokers to avoid NPE
    
    * extract operations of invokers and validInvokers
    
    * remove useless import
    
    * reset validInvokersInitialized at destroy()
    
    * add invokersInitialized for refreshInvoker
    
    * optimize codes
    
    * use local reference to avoid NPE
    
    * optimize codes
    
    * set empty instead of clearing at destroyInvokers()
    
    * remove useless lock
    
    * add lock for validInvoker
    
    * return unmodifiableList for getInvokers and getValidInvokers
    
    * return clone object to avoid being modified
    
    * fix ut
    
    * optimize code
    
    * refresh validInvokers at setInvokers
    
    * wait new task finished at checkConnectivity()
    
    * retrieve checkConnectivity and fix ConnectivityValidationTest
    
    * retrieve checkConnectivity
    
    * optimize setInvokers()
---
 .../rpc/cluster/directory/AbstractDirectory.java   | 141 +++++++++++----------
 .../rpc/cluster/directory/StaticDirectory.java     |  18 ++-
 .../rpc/cluster/directory/StaticDirectoryTest.java |   4 +-
 .../support/ConnectivityValidationTest.java        |  61 ++++++---
 .../file/FileSystemDynamicConfigurationTest.java   |   5 +-
 .../dubbo/config/bootstrap/MultiInstanceTest.java  |  17 ++-
 .../client/ServiceDiscoveryRegistryDirectory.java  |  23 ++--
 .../registry/integration/DynamicDirectory.java     |   9 +-
 .../registry/integration/RegistryDirectory.java    |  42 +++---
 9 files changed, 184 insertions(+), 136 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 2ec0ecc..cbce9e4 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -51,8 +51,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
@@ -80,17 +78,20 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
 
     protected final Map<String, String> queryMap;
 
-    protected final Lock invokerLock = new ReentrantLock();
+    /**
+     * Invokers initialized flag.
+     */
+    private volatile boolean invokersInitialized = false;
 
     /**
      * All invokers from registry
      */
-    protected volatile BitList<Invoker<T>> invokers;
+    private volatile BitList<Invoker<T>> invokers = BitList.emptyList();
 
     /**
      * Valid Invoker. All invokers from registry exclude unavailable and disabled invokers.
      */
-    protected volatile BitList<Invoker<T>> validInvokers;
+    private volatile BitList<Invoker<T>> validInvokers = BitList.emptyList();
 
     /**
      * Waiting to reconnect invokers.
@@ -177,18 +178,19 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
         }
 
         BitList<Invoker<T>> availableInvokers;
-        if (validInvokers != null) {
-            availableInvokers = validInvokers;
+        // use clone to avoid being modified at doList().
+        if (invokersInitialized) {
+            availableInvokers = validInvokers.clone();
         } else {
-            availableInvokers = invokers;
+            availableInvokers = invokers.clone();
         }
 
         List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
         if (routedResult.isEmpty()) {
             logger.warn("No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
-                + " all validInvokers' size: " + (validInvokers == null ? 0 : validInvokers.size())
+                + " all validInvokers' size: " + validInvokers.size()
                 + "/ all routed invokers' size: " + routedResult.size()
-                + "/ all invokers' size: " + (invokers == null ? 0 : invokers.size())
+                + "/ all invokers' size: " + invokers.size()
                 + " from registry " + getUrl().getAddress()
                 + " on the consumer " + NetUtils.getLocalHost()
                 + " using the dubbo version " + Version.getVersion() + ".");
@@ -230,12 +232,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
     @Override
     public void destroy() {
         destroyed = true;
-        if (invokers != null) {
-            invokers.clear();
-        }
-        if (validInvokers != null) {
-            validInvokers.clear();
-        }
+        destroyInvokers();
         invokersToReconnect.clear();
         disabledInvokers.clear();
     }
@@ -247,17 +244,12 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
 
     @Override
     public void addInvalidateInvoker(Invoker<T> invoker) {
-        invokerLock.lock();
-        try {
-            // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
-            if (validInvokers.remove(invoker)) {
-                // 2. add this invoker to reconnect list
-                invokersToReconnect.add(invoker);
-                // 3. try start check connectivity task
-                checkConnectivity();
-            }
-        } finally {
-            invokerLock.unlock();
+        // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
+        if (removeValidInvoker(invoker)) {
+            // 2. add this invoker to reconnect list
+            invokersToReconnect.add(invoker);
+            // 3. try start check connectivity task
+            checkConnectivity();
         }
     }
 
@@ -299,17 +291,12 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
                     }
 
                     // 3. recover valid invoker
-                    invokerLock.lock();
-                    try {
-                        for (Invoker<T> tInvoker : needDeleteList) {
-                            if (invokers.contains(tInvoker)) {
-                                validInvokers.add(tInvoker);
-                                logger.info("Recover service address: " + tInvoker.getUrl() + "  from invalid list.");
-                            }
-                            invokersToReconnect.remove(tInvoker);
+                    for (Invoker<T> tInvoker : needDeleteList) {
+                        if (invokers.contains(tInvoker)) {
+                            addValidInvoker(tInvoker);
+                            logger.info("Recover service address: " + tInvoker.getUrl() + "  from invalid list.");
                         }
-                    } finally {
-                        invokerLock.unlock();
+                        invokersToReconnect.remove(tInvoker);
                     }
                 } finally {
                     checkConnectivityPermit.release();
@@ -330,20 +317,19 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
      * 3. all the invokers disappeared from total invokers should be removed in the need to reconnect list
      * 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list
      */
-    public synchronized void refreshInvoker() {
-        invokerLock.lock();
-        try {
-            if (invokers != null) {
-                BitList<Invoker<T>> copiedInvokers = invokers.clone();
-                refreshInvokers(copiedInvokers, invokersToReconnect);
-                refreshInvokers(copiedInvokers, disabledInvokers);
-                validInvokers = copiedInvokers;
-            }
-        } finally {
-            invokerLock.unlock();
+    public void refreshInvoker() {
+        if (invokersInitialized) {
+            refreshInvokerInternal();
         }
     }
 
+    private synchronized void refreshInvokerInternal() {
+        BitList<Invoker<T>> copiedInvokers = invokers.clone();
+        refreshInvokers(copiedInvokers, invokersToReconnect);
+        refreshInvokers(copiedInvokers, disabledInvokers);
+        validInvokers = copiedInvokers;
+    }
+
     private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invoker<T>> invokersToRemove) {
         List<Invoker<T>> needToRemove = new LinkedList<>();
         for (Invoker<T> tInvoker : invokersToRemove) {
@@ -358,32 +344,22 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
 
     @Override
     public void addDisabledInvoker(Invoker<T> invoker) {
-        invokerLock.lock();
-        try {
-            if (invokers.contains(invoker)) {
-                disabledInvokers.add(invoker);
-                validInvokers.remove(invoker);
-                logger.info("Disable service address: " + invoker.getUrl() + ".");
-            }
-        } finally {
-            invokerLock.unlock();
+        if (invokers.contains(invoker)) {
+            disabledInvokers.add(invoker);
+            removeValidInvoker(invoker);
+            logger.info("Disable service address: " + invoker.getUrl() + ".");
         }
     }
 
     @Override
     public void recoverDisabledInvoker(Invoker<T> invoker) {
-        invokerLock.lock();
-        try {
-            if (disabledInvokers.remove(invoker)) {
-                try {
-                    validInvokers.add(invoker);
-                    logger.info("Recover service address: " + invoker.getUrl() + "  from disabled list.");
-                } catch (Throwable ignore) {
+        if (disabledInvokers.remove(invoker)) {
+            try {
+                addValidInvoker(invoker);
+                logger.info("Recover service address: " + invoker.getUrl() + "  from disabled list.");
+            } catch (Throwable ignore) {
 
-                }
             }
-        } finally {
-            invokerLock.unlock();
         }
     }
 
@@ -404,11 +380,13 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
     }
 
     public BitList<Invoker<T>> getInvokers() {
-        return invokers;
+        // return clone to avoid being modified.
+        return invokers.clone();
     }
 
     public BitList<Invoker<T>> getValidInvokers() {
-        return validInvokers;
+        // return clone to avoid being modified.
+        return validInvokers.clone();
     }
 
     public List<Invoker<T>> getInvokersToReconnect() {
@@ -419,6 +397,31 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
         return disabledInvokers;
     }
 
+    protected void setInvokers(BitList<Invoker<T>> invokers) {
+        this.invokers = invokers;
+        refreshInvokerInternal();
+        this.invokersInitialized = true;
+    }
+
+    protected void destroyInvokers() {
+        // set empty instead of clearing to support concurrent access.
+        this.invokers = BitList.emptyList();
+        this.validInvokers = BitList.emptyList();
+        this.invokersInitialized = false;
+    }
+
+    private boolean addValidInvoker(Invoker<T> invoker) {
+        synchronized (this.validInvokers) {
+            return this.validInvokers.add(invoker);
+        }
+    }
+
+    private boolean removeValidInvoker(Invoker<T> invoker) {
+        synchronized (this.validInvokers) {
+            return this.validInvokers.remove(invoker);
+        }
+    }
+
     protected abstract List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) throws RpcException;
 
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
index e11f4e0..ed6eb2d 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
@@ -51,18 +51,17 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
         if (CollectionUtils.isEmpty(invokers)) {
             throw new IllegalArgumentException("invokers == null");
         }
-        this.invokers = new BitList<>(invokers);
-        this.validInvokers = this.invokers.clone();
+        this.setInvokers(new BitList<>(invokers));
     }
 
     @Override
     public Class<T> getInterface() {
-        return invokers.get(0).getInterface();
+        return getInvokers().get(0).getInterface();
     }
 
     @Override
     public List<Invoker<T>> getAllInvokers() {
-        return invokers;
+        return getInvokers();
     }
 
     @Override
@@ -70,7 +69,7 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
         if (isDestroyed()) {
             return false;
         }
-        for (Invoker<T> invoker : validInvokers) {
+        for (Invoker<T> invoker : getValidInvokers()) {
             if (invoker.isAvailable()) {
                 return true;
             }
@@ -83,7 +82,7 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
         if (isDestroyed()) {
             return;
         }
-        for (Invoker<T> invoker : invokers) {
+        for (Invoker<T> invoker : getInvokers()) {
             invoker.destroy();
         }
         super.destroy();
@@ -91,16 +90,15 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
 
     public void buildRouterChain() {
         RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
-        routerChain.setInvokers(invokers);
+        routerChain.setInvokers(getInvokers());
         routerChain.loop(true);
         this.setRouterChain(routerChain);
     }
 
     public void notify(List<Invoker<T>> invokers) {
-        this.invokers = new BitList<>(invokers);
-        refreshInvoker();
+        this.setInvokers(new BitList<>(invokers));
         if (routerChain != null) {
-            routerChain.setInvokers(this.invokers);
+            routerChain.setInvokers(this.getInvokers());
         }
     }
 
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectoryTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectoryTest.java
index c9fe0c7..69022af 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectoryTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectoryTest.java
@@ -61,7 +61,7 @@ public class StaticDirectoryTest {
         List<Invoker<String>> newInvokers = staticDirectory.list(new MockDirInvocation());
         Assertions.assertTrue(newInvokers.size() > 0);
         staticDirectory.destroy();
-        Assertions.assertEquals(0, staticDirectory.invokers.size());
-        Assertions.assertEquals(0, staticDirectory.validInvokers.size());
+        Assertions.assertEquals(0, staticDirectory.getInvokers().size());
+        Assertions.assertEquals(0, staticDirectory.getValidInvokers().size());
     }
 }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ConnectivityValidationTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ConnectivityValidationTest.java
index 99b1684..94f6579 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ConnectivityValidationTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ConnectivityValidationTest.java
@@ -29,6 +29,7 @@ import org.apache.dubbo.rpc.cluster.LoadBalance;
 import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,8 +37,11 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -111,6 +115,11 @@ public class ConnectivityValidationTest {
         clusterInvoker = new ConnectivityClusterInvoker(directory);
     }
 
+    @AfterEach
+    public void tearDown() {
+        clusterInvoker.destroy();
+    }
+
     private void configInvoker(Invoker invoker) {
         when(invoker.getUrl()).thenReturn(URL.valueOf(""));
         when(invoker.isAvailable()).thenReturn(true);
@@ -145,26 +154,30 @@ public class ConnectivityValidationTest {
         Assertions.assertEquals(0, directory.list(invocation).size());
 
         when(invoker1.isAvailable()).thenReturn(true);
-        waitRefresh();
+        Set<Invoker> invokerSet = new HashSet<>();
+        invokerSet.add(invoker1);
+        waitRefresh(invokerSet);
         Assertions.assertEquals(1, directory.list(invocation).size());
         Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
 
         when(invoker2.isAvailable()).thenReturn(true);
-        waitRefresh();
+        invokerSet.add(invoker2);
+        waitRefresh(invokerSet);
         Assertions.assertEquals(2, directory.list(invocation).size());
         Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
 
         invokerList.remove(invoker5);
         directory.notify(invokerList);
         when(invoker2.isAvailable()).thenReturn(true);
-        waitRefresh();
+        waitRefresh(invokerSet);
         Assertions.assertEquals(2, directory.list(invocation).size());
         Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
 
         when(invoker3.isAvailable()).thenReturn(true);
         when(invoker4.isAvailable()).thenReturn(true);
-
-        waitRefresh();
+        invokerSet.add(invoker3);
+        invokerSet.add(invoker4);
+        waitRefresh(invokerSet);
         Assertions.assertEquals(4, directory.list(invocation).size());
         Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
     }
@@ -186,7 +199,9 @@ public class ConnectivityValidationTest {
         Assertions.assertEquals(1, directory.list(invocation).size());
 
         when(invoker1.isAvailable()).thenReturn(true);
-        waitRefresh();
+        Set<Invoker> invokerSet = new HashSet<>();
+        invokerSet.add(invoker1);
+        waitRefresh(invokerSet);
         Assertions.assertEquals(2, directory.list(invocation).size());
     }
 
@@ -251,7 +266,23 @@ public class ConnectivityValidationTest {
         when(invoker14.isAvailable()).thenReturn(true);
         when(invoker15.isAvailable()).thenReturn(true);
 
-        waitRefresh();
+        Set<Invoker> invokerSet = new HashSet<>();
+        invokerSet.add(invoker1);
+        invokerSet.add(invoker2);
+        invokerSet.add(invoker3);
+        invokerSet.add(invoker4);
+        invokerSet.add(invoker5);
+        invokerSet.add(invoker6);
+        invokerSet.add(invoker7);
+        invokerSet.add(invoker8);
+        invokerSet.add(invoker9);
+        invokerSet.add(invoker10);
+        invokerSet.add(invoker11);
+        invokerSet.add(invoker12);
+        invokerSet.add(invoker13);
+        invokerSet.add(invoker14);
+        invokerSet.add(invoker15);
+        waitRefresh(invokerSet);
         Assertions.assertTrue(directory.list(invocation).size() > 1);
     }
 
@@ -271,15 +302,15 @@ public class ConnectivityValidationTest {
         }
     }
 
-    private void waitRefresh() throws InterruptedException {
-        ScheduledFuture future = directory.getConnectivityCheckFuture();
-        while (!future.isDone()) {
-            Thread.sleep(10);
-        }
+    private void waitRefresh(Set<Invoker> invokerSet) throws InterruptedException {
         directory.checkConnectivity();
-        future = directory.getConnectivityCheckFuture();
-        while (!future.isDone()) {
-            Thread.sleep(10);
+        while (true) {
+            List<Invoker> reconnectList = directory.getInvokersToReconnect();
+            if (reconnectList.stream().anyMatch(invoker -> invokerSet.contains(invoker))) {
+                Thread.sleep(10);
+                continue;
+            }
+            break;
         }
     }
 
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
index 97f55d2..4d3caea 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
@@ -22,8 +22,8 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,7 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * {@link FileSystemDynamicConfiguration} Test
  */
 // Test often failed on Github Actions Platform because of file system on Azure
-@DisabledIfEnvironmentVariable(named = "DISABLE_FILE_SYSTEM_TEST", matches = "true")
+// Change to Disabled because DisabledIfEnvironmentVariable does not work on Github.
+@Disabled
 public class FileSystemDynamicConfigurationTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
index 4aacacd..b4f4698 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
@@ -34,6 +34,8 @@ import org.apache.dubbo.config.api.DemoService;
 import org.apache.dubbo.config.api.Greeting;
 import org.apache.dubbo.config.mock.GreetingLocal2;
 import org.apache.dubbo.config.provider.impl.DemoServiceImpl;
+import org.apache.dubbo.registry.client.migration.MigrationInvoker;
+import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
@@ -801,7 +803,8 @@ public class MultiInstanceTest {
                 .service(serviceConfig)
                 .asyncStart();
             logger.warn("provider app has start async");
-            Assertions.assertFalse(serviceConfig.getScopeModel().getDeployer().isStarted(), "Async export seems something wrong");
+            // it might be started if running on fast machine.
+            // Assertions.assertFalse(serviceConfig.getScopeModel().getDeployer().isStarted(), "Async export seems something wrong");
 
             // consumer app
             Future consumerFuture = consumerBootstrap
@@ -810,7 +813,8 @@ public class MultiInstanceTest {
                 .reference(referenceConfig)
                 .asyncStart();
             logger.warn("consumer app has start async");
-            Assertions.assertFalse(referenceConfig.getScopeModel().getDeployer().isStarted(), "Async refer seems something wrong");
+            // it might be started if running on fast machine.
+            // Assertions.assertFalse(referenceConfig.getScopeModel().getDeployer().isStarted(), "Async refer seems something wrong");
 
             // wait for provider app startup
             providerFuture.get();
@@ -824,6 +828,15 @@ public class MultiInstanceTest {
             logger.warn("consumer app is startup");
             Object target = referenceConfig.getServiceMetadata().getTarget();
             Assertions.assertNotNull(target);
+            // wait for invokers notified from registry
+            MigrationInvoker migrationInvoker = (MigrationInvoker) referenceConfig.getInvoker(); 
+            for (int i = 0; i < 10; i++) {
+                if (((List<Invoker>) migrationInvoker.getDirectory().getAllInvokers())
+                        .stream().anyMatch(invoker -> invoker.getInterface() == Greeting.class)) {
+                    break;
+                }
+                Thread.sleep(100);
+            }
             Greeting greetingService = (Greeting) target;
             String result = greetingService.hello();
             Assertions.assertEquals("local", result);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 29f07bf..2e51f2d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -204,9 +204,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
         if (invokerUrls.size() == 0) {
             logger.info("Received empty url list...");
             this.forbidden = true; // Forbid to access
-            this.invokers = BitList.emptyList();
-            this.validInvokers = BitList.emptyList();
-            routerChain.setInvokers(this.invokers);
+            routerChain.setInvokers(BitList.emptyList());
             destroyAllInvokers(); // Close all invokers
         } else {
             this.forbidden = false; // Allow accessing
@@ -214,12 +212,14 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
                 return;
             }
 
-            // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.
+            // use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
+            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
+            // can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
             Map<String, Invoker<T>> oldUrlInvokerMap = null;
-            if (this.urlInvokerMap != null) {
+            if (localUrlInvokerMap != null) {
                 // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
-                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
-                this.urlInvokerMap.forEach(oldUrlInvokerMap::put);
+                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
+                localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
             }
             Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
             logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
@@ -229,9 +229,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
                 return;
             }
             List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
-            this.invokers = multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers);
+            this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
             // pre-route and build cache
-            routerChain.setInvokers(this.invokers);
+            routerChain.setInvokers(this.getInvokers());
             this.urlInvokerMap = newUrlInvokerMap;
 
             if (oldUrlInvokerMap != null) {
@@ -251,7 +251,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
      * Turn urls into invokers, and if url has been refer, will not re-reference.
      * the items that will be put into newUrlInvokeMap will be removed from oldUrlInvokerMap.
      *
-     * @param oldUrlInvokerMap
+     * @param oldUrlInvokerMap it might be modified during the process.
      * @param urls
      * @return invokers
      */
@@ -350,8 +350,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
         }
 
         this.urlInvokerMap = null;
-        this.invokers = null;
-        this.validInvokers = null;
+        this.destroyInvokers();
     }
 
     /**
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 1d887a8..8689cbd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -39,7 +39,6 @@ import org.apache.dubbo.rpc.cluster.RouterFactory;
 import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory;
 import org.apache.dubbo.rpc.cluster.router.state.BitList;
 
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
@@ -175,7 +174,7 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
         }
 
         if (multiGroup) {
-            return this.invokers == null ? BitList.emptyList() : this.invokers;
+            return this.getInvokers();
         }
 
         try {
@@ -195,7 +194,7 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
 
     @Override
     public List<Invoker<T>> getAllInvokers() {
-        return this.invokers == null ? Collections.emptyList() : this.invokers;
+        return this.getInvokers();
     }
 
     /**
@@ -258,8 +257,8 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
         if (isDestroyed() || this.forbidden) {
             return false;
         }
-        return CollectionUtils.isNotEmpty(validInvokers)
-            && validInvokers.stream().anyMatch(Invoker::isAvailable);
+        return CollectionUtils.isNotEmpty(getValidInvokers())
+            && getValidInvokers().stream().anyMatch(Invoker::isAvailable);
     }
 
     @Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index d02823b..e53c900 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -203,9 +203,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
                 && invokerUrls.get(0) != null
                 && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
             this.forbidden = true; // Forbid to access
-            this.invokers = BitList.emptyList();
-            this.validInvokers = BitList.emptyList();
-            routerChain.setInvokers(this.invokers);
+            routerChain.setInvokers(BitList.emptyList());
             destroyAllInvokers(); // Close all invokers
         } else {
             this.forbidden = false; // Allow to access
@@ -213,22 +211,27 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
             if (invokerUrls == Collections.<URL>emptyList()) {
                 invokerUrls = new ArrayList<>();
             }
-            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
-                invokerUrls.addAll(this.cachedInvokerUrls);
+            // use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().
+            Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;
+            if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {
+                invokerUrls.addAll(localCachedInvokerUrls);
             } else {
-                this.cachedInvokerUrls = new HashSet<>();
-                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
+                localCachedInvokerUrls = new HashSet<>();
+                localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
+                this.cachedInvokerUrls = localCachedInvokerUrls;
             }
             if (invokerUrls.isEmpty()) {
                 return;
             }
-            
-            // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.
+
+            // use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().
+            Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
+            // can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().
             Map<URL, Invoker<T>> oldUrlInvokerMap = null;
-            if (this.urlInvokerMap != null) {
+            if (localUrlInvokerMap != null) {
                 // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
-                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
-                this.urlInvokerMap.forEach(oldUrlInvokerMap::put);
+                oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
+                localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
             }
             Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
 
@@ -247,9 +250,9 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
             }
 
             List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
-            this.invokers = multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers);
+            this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));
             // pre-route and build cache
-            routerChain.setInvokers(this.invokers);
+            routerChain.setInvokers(this.getInvokers());
             this.urlInvokerMap = newUrlInvokerMap;
 
             try {
@@ -322,7 +325,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
      * Turn urls into invokers, and if url has been refer, will not re-reference.
      * the items that will be put into newUrlInvokeMap will be removed from oldUrlInvokerMap.
      *
-     * @param oldUrlInvokerMap     
+     * @param oldUrlInvokerMap it might be modified during the process.
      * @param urls
      * @return invokers
      */
@@ -485,9 +488,10 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
             }
             localUrlInvokerMap.clear();
         }
-        invokers = null;
-        validInvokers = null;
-        cachedInvokerUrls = null;
+
+        this.urlInvokerMap = null;
+        this.cachedInvokerUrls = null;
+        destroyInvokers();
     }
 
     private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
@@ -524,7 +528,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
 
     @Override
     public List<Invoker<T>> getAllInvokers() {
-        return this.invokers == null ? Collections.emptyList() : this.invokers;
+        return this.getInvokers();
     }
 
     @Override