You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/05/01 07:29:30 UTC

[dubbo] branch master updated: Retry when reexport fails (#5959)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new c28171b  Retry when reexport fails (#5959)
c28171b is described below

commit c28171be8f162d486b7b1f8a62134b6f0be0be8d
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri May 1 15:29:14 2020 +0800

    Retry when reexport fails (#5959)
    
    fixes #5930
---
 .../java/org/apache/dubbo/registry/Registry.java   |  7 +++
 .../registry/integration/RegistryProtocol.java     | 52 ++++++++++++++--
 .../{Registry.java => retry/ReExportTask.java}     | 70 +++++++++++++---------
 .../dubbo/registry/support/FailbackRegistry.java   | 34 +++++++++++
 4 files changed, 128 insertions(+), 35 deletions(-)

diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
index 835c27a..d5b3dbc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
@@ -26,4 +26,11 @@ import org.apache.dubbo.common.URL;
  * @see org.apache.dubbo.registry.support.AbstractRegistry
  */
 public interface Registry extends Node, RegistryService {
+    default void reExportRegister(URL url) {
+        register(url);
+    }
+
+    default void reExportUnregister(URL url) {
+        unregister(url);
+    }
 }
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 7fec8f8..76da303 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.common.utils.StringUtils;
@@ -31,6 +32,8 @@ import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.RegistryFactory;
 import org.apache.dubbo.registry.RegistryService;
+import org.apache.dubbo.registry.retry.ReExportTask;
+import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
@@ -51,6 +54,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
@@ -85,9 +89,11 @@ import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls;
 import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
 import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
 import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD;
 import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
 import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
 import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
 import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
 import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
 import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
@@ -131,6 +137,9 @@ public class RegistryProtocol implements Protocol {
     private RegistryFactory registryFactory;
     private ProxyFactory proxyFactory;
 
+    private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>();
+    private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128);
+
     //Filter the parameters that do not need to be output in url(Starting with .)
     private static String[] getFilteredKeys(URL url) {
         Map<String, String> params = url.getParameters();
@@ -280,18 +289,49 @@ public class RegistryProtocol implements Protocol {
 
         // update registry
         if (!newProviderUrl.equals(registeredUrl)) {
-            if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
-                Registry registry = getRegistry(originInvoker);
-                logger.info("Try to unregister old url: " + registeredUrl);
-                registry.unregister(registeredUrl);
+            try {
+                doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl);
+            } catch (Exception e) {
+                ReExportTask oldTask = reExportFailedTasks.get(registeredUrl);
+                if (oldTask != null) {
+                    return;
+                }
+                ReExportTask task = new ReExportTask(
+                        () -> doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl),
+                        registeredUrl,
+                        null
+                );
+                oldTask = reExportFailedTasks.putIfAbsent(registeredUrl, task);
+                if (oldTask == null) {
+                    // never has a retry task. then start a new task for retry.
+                    retryTimer.newTimeout(task, registryUrl.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD), TimeUnit.MILLISECONDS);
+                }
+            }
+        }
+    }
 
-                logger.info("Try to register new url: " + newProviderUrl);
-                registry.register(newProviderUrl);
+    private <T> void doReExport(final Invoker<T> originInvoker, ExporterChangeableWrapper<T> exporter,
+                                URL registryUrl, URL oldProviderUrl, URL newProviderUrl) {
+        if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
+            Registry registry = null;
+            try {
+                registry = getRegistry(originInvoker);
+            } catch (Exception e) {
+                throw new SkipFailbackWrapperException(e);
             }
 
+            logger.info("Try to unregister old url: " + oldProviderUrl);
+            registry.reExportUnregister(oldProviderUrl);
+
+            logger.info("Try to register new url: " + newProviderUrl);
+            registry.reExportRegister(newProviderUrl);
+        }
+        try {
             ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl);
             statedUrl.setProviderUrl(newProviderUrl);
             exporter.setRegisterUrl(newProviderUrl);
+        } catch (Exception e) {
+            throw new SkipFailbackWrapperException(e);
         }
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
similarity index 59%
copy from dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
copy to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
index 835c27a..952bbba 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
@@ -1,29 +1,41 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry;
-
-import org.apache.dubbo.common.Node;
-import org.apache.dubbo.common.URL;
-
-/**
- * Registry. (SPI, Prototype, ThreadSafe)
- *
- * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
- * @see org.apache.dubbo.registry.support.AbstractRegistry
- */
-public interface Registry extends Node, RegistryService {
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ *
+ */
+public class ReExportTask extends AbstractRetryTask {
+
+    private static final String NAME = "retry re-export";
+
+    private Runnable runnable;
+
+    public ReExportTask(Runnable runnable, URL oldUrl, FailbackRegistry registry) {
+        super(oldUrl, registry, NAME);
+        this.runnable = runnable;
+    }
+
+    @Override
+    protected void doRetry(URL oldUrl, FailbackRegistry registry, Timeout timeout) {
+        runnable.run();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
index 5a51c0b..0197b3e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
@@ -261,6 +261,25 @@ public abstract class FailbackRegistry extends AbstractRegistry {
     }
 
     @Override
+    public void reExportRegister(URL url) {
+        if (!acceptable(url)) {
+            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
+            return;
+        }
+        super.register(url);
+        removeFailedRegistered(url);
+        removeFailedUnregistered(url);
+        try {
+            // Sending a registration request to the server side
+            doRegister(url);
+        } catch (Exception e) {
+            if (!(e instanceof SkipFailbackWrapperException)) {
+                throw new IllegalStateException("Failed to register (re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
     public void unregister(URL url) {
         super.unregister(url);
         removeFailedRegistered(url);
@@ -291,6 +310,21 @@ public abstract class FailbackRegistry extends AbstractRegistry {
     }
 
     @Override
+    public void reExportUnregister(URL url) {
+        super.unregister(url);
+        removeFailedRegistered(url);
+        removeFailedUnregistered(url);
+        try {
+            // Sending a cancellation request to the server side
+            doUnregister(url);
+        } catch (Exception e) {
+            if (!(e instanceof SkipFailbackWrapperException)) {
+                throw new IllegalStateException("Failed to unregister(re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
     public void subscribe(URL url, NotifyListener listener) {
         super.subscribe(url, listener);
         removeFailedSubscribed(url, listener);