You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/05/01 07:29:30 UTC
[dubbo] branch master updated: Retry when reexport fails (#5959)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new c28171b Retry when reexport fails (#5959)
c28171b is described below
commit c28171be8f162d486b7b1f8a62134b6f0be0be8d
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri May 1 15:29:14 2020 +0800
Retry when reexport fails (#5959)
fixes #5930
---
.../java/org/apache/dubbo/registry/Registry.java | 7 +++
.../registry/integration/RegistryProtocol.java | 52 ++++++++++++++--
.../{Registry.java => retry/ReExportTask.java} | 70 +++++++++++++---------
.../dubbo/registry/support/FailbackRegistry.java | 34 +++++++++++
4 files changed, 128 insertions(+), 35 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
index 835c27a..d5b3dbc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
@@ -26,4 +26,11 @@ import org.apache.dubbo.common.URL;
* @see org.apache.dubbo.registry.support.AbstractRegistry
*/
public interface Registry extends Node, RegistryService {
+ default void reExportRegister(URL url) {
+ register(url);
+ }
+
+ default void reExportUnregister(URL url) {
+ unregister(url);
+ }
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 7fec8f8..76da303 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
@@ -31,6 +32,8 @@ import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.RegistryService;
+import org.apache.dubbo.registry.retry.ReExportTask;
+import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
@@ -51,6 +54,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
@@ -85,9 +89,11 @@ import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
@@ -131,6 +137,9 @@ public class RegistryProtocol implements Protocol {
private RegistryFactory registryFactory;
private ProxyFactory proxyFactory;
+ private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>();
+ private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128);
+
//Filter the parameters that do not need to be output in url(Starting with .)
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
@@ -280,18 +289,49 @@ public class RegistryProtocol implements Protocol {
// update registry
if (!newProviderUrl.equals(registeredUrl)) {
- if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
- Registry registry = getRegistry(originInvoker);
- logger.info("Try to unregister old url: " + registeredUrl);
- registry.unregister(registeredUrl);
+ try {
+ doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl);
+ } catch (Exception e) {
+ ReExportTask oldTask = reExportFailedTasks.get(registeredUrl);
+ if (oldTask != null) {
+ return;
+ }
+ ReExportTask task = new ReExportTask(
+ () -> doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl),
+ registeredUrl,
+ null
+ );
+ oldTask = reExportFailedTasks.putIfAbsent(registeredUrl, task);
+ if (oldTask == null) {
+ // never has a retry task. then start a new task for retry.
+ retryTimer.newTimeout(task, registryUrl.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
- logger.info("Try to register new url: " + newProviderUrl);
- registry.register(newProviderUrl);
+ private <T> void doReExport(final Invoker<T> originInvoker, ExporterChangeableWrapper<T> exporter,
+ URL registryUrl, URL oldProviderUrl, URL newProviderUrl) {
+ if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
+ Registry registry = null;
+ try {
+ registry = getRegistry(originInvoker);
+ } catch (Exception e) {
+ throw new SkipFailbackWrapperException(e);
}
+ logger.info("Try to unregister old url: " + oldProviderUrl);
+ registry.reExportUnregister(oldProviderUrl);
+
+ logger.info("Try to register new url: " + newProviderUrl);
+ registry.reExportRegister(newProviderUrl);
+ }
+ try {
ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl);
statedUrl.setProviderUrl(newProviderUrl);
exporter.setRegisterUrl(newProviderUrl);
+ } catch (Exception e) {
+ throw new SkipFailbackWrapperException(e);
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
similarity index 59%
copy from dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
copy to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
index 835c27a..952bbba 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/retry/ReExportTask.java
@@ -1,29 +1,41 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry;
-
-import org.apache.dubbo.common.Node;
-import org.apache.dubbo.common.URL;
-
-/**
- * Registry. (SPI, Prototype, ThreadSafe)
- *
- * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
- * @see org.apache.dubbo.registry.support.AbstractRegistry
- */
-public interface Registry extends Node, RegistryService {
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.retry;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+
+/**
+ *
+ */
+public class ReExportTask extends AbstractRetryTask {
+
+ private static final String NAME = "retry re-export";
+
+ private Runnable runnable;
+
+ public ReExportTask(Runnable runnable, URL oldUrl, FailbackRegistry registry) {
+ super(oldUrl, registry, NAME);
+ this.runnable = runnable;
+ }
+
+ @Override
+ protected void doRetry(URL oldUrl, FailbackRegistry registry, Timeout timeout) {
+ runnable.run();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
index 5a51c0b..0197b3e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
@@ -261,6 +261,25 @@ public abstract class FailbackRegistry extends AbstractRegistry {
}
@Override
+ public void reExportRegister(URL url) {
+ if (!acceptable(url)) {
+ logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
+ return;
+ }
+ super.register(url);
+ removeFailedRegistered(url);
+ removeFailedUnregistered(url);
+ try {
+ // Sending a registration request to the server side
+ doRegister(url);
+ } catch (Exception e) {
+ if (!(e instanceof SkipFailbackWrapperException)) {
+ throw new IllegalStateException("Failed to register (re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
public void unregister(URL url) {
super.unregister(url);
removeFailedRegistered(url);
@@ -291,6 +310,21 @@ public abstract class FailbackRegistry extends AbstractRegistry {
}
@Override
+ public void reExportUnregister(URL url) {
+ super.unregister(url);
+ removeFailedRegistered(url);
+ removeFailedUnregistered(url);
+ try {
+ // Sending a cancellation request to the server side
+ doUnregister(url);
+ } catch (Exception e) {
+ if (!(e instanceof SkipFailbackWrapperException)) {
+ throw new IllegalStateException("Failed to unregister(re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);