You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/11 14:29:57 UTC

[incubator-dubbo] branch master updated: Fix some etcd3 registry bugs. (#3632)

This is an automated email from the ASF dual-hosted git repository.

huxing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new fed47bf  Fix some etcd3 registry bugs. (#3632)
fed47bf is described below

commit fed47bf0a864f0922c5086e3d10b37acec563f3d
Author: yì jí <yi...@apache.org>
AuthorDate: Mon Mar 11 22:29:15 2019 +0800

    Fix some etcd3 registry bugs. (#3632)
    
    * fix some bugs.
    * fix typo
    * cancel keep alive if recovery failed.
    * remove duplicate license header.
---
 .../java/org/apache/dubbo/common/Constants.java    |  11 +-
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java     | 113 +++++++++++++--------
 .../remoting/etcd/jetcd/JEtcdClientWrapper.java    | 104 ++++++++++---------
 3 files changed, 131 insertions(+), 97 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index aa6ae05..e54fcf8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -831,8 +831,13 @@ public class Constants {
      * Production environment key.
      */
     public static final String PRODUCTION_ENVIRONMENT = "product";
-    /*
-     * private Constants(){ }
-     */
+
+    public static final String ETCD3_NOTIFY_MAXTHREADS_KEYS = "etcd3.notify.maxthreads";
+
+    public static final int DEFAULT_ETCD3_NOTIFY_THREADS = DEFAULT_IO_THREADS;
+
+    public static final String DEFAULT_ETCD3_NOTIFY_QUEUES_KEY = "etcd3.notify.queues";
+
+    public static final int DEFAULT_GRPC_QUEUES = 300_0000;
 
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index 979caee..d07cad0 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.dubbo.remoting.etcd.jetcd;
 
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
 import com.google.protobuf.ByteString;
 import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.api.Event;
@@ -46,15 +41,6 @@ import io.etcd.jetcd.common.exception.ClosedClientException;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import io.netty.util.internal.ConcurrentSet;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.remoting.etcd.ChildListener;
-import org.apache.dubbo.remoting.etcd.StateListener;
-import org.apache.dubbo.remoting.etcd.option.OptionUtil;
-import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -63,10 +49,14 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
@@ -79,6 +69,8 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
     private JEtcdClientWrapper clientWrapper;
     private ScheduledExecutorService reconnectSchedule;
 
+    private ExecutorService notifyExecutor;
+
     private int delayPeriod;
     private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
 
@@ -95,7 +87,16 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
             });
             delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
             reconnectSchedule = Executors.newScheduledThreadPool(1,
-                    new NamedThreadFactory("auto-reconnect"));
+                    new NamedThreadFactory("etcd3-watch-auto-reconnect"));
+
+            notifyExecutor = new ThreadPoolExecutor(
+                    1
+                    , url.getParameter(Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS, Constants.DEFAULT_ETCD3_NOTIFY_THREADS)
+                    , Constants.DEFAULT_SESSION_TIMEOUT
+                    , TimeUnit.MILLISECONDS
+                    , new LinkedBlockingQueue<Runnable>(url.getParameter(Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, Constants.DEFAULT_GRPC_QUEUES * 3))
+                    , new NamedThreadFactory("etcd3-notify", true));
+
             clientWrapper.start();
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
@@ -166,9 +167,19 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
     @Override
     public void doClose() {
         try {
-            reconnectSchedule.shutdownNow();
+            if (notifyExecutor != null) {
+                ExecutorUtil.shutdownNow(notifyExecutor, 100);
+            }
         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+        }
 
+        try {
+            if (reconnectSchedule != null) {
+                ExecutorUtil.shutdownNow(reconnectSchedule, 100);
+            }
+        } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
         } finally {
             clientWrapper.doClose();
         }
@@ -181,9 +192,11 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
         protected long watchId;
         protected String path;
         protected Throwable throwable;
-        protected Set<String> urls = new ConcurrentSet<>();
+        protected volatile Set<String> urls = new ConcurrentSet<>();
         private ChildListener listener;
 
+        protected ReentrantLock lock = new ReentrantLock(true);
+
         public EtcdWatcher(ChildListener listener) {
             this.listener = listener;
         }
@@ -220,7 +233,12 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
                     }
                 }
                 if (modified > 0) {
-                    listener.childChanged(path, new ArrayList<>(urls));
+                    notifyExecutor.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            listener.childChanged(path, new ArrayList<>(urls));
+                        }
+                    });
                 }
 
             }
@@ -257,37 +275,42 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
             if (!isConnected()) {
                 throw new ClosedClientException("watch client has been closed, path '" + path + "'");
             }
-
             if (this.path != null) {
-                if (this.path.equals(path)) {
-                    return clientWrapper.getChildren(path);
-                }
                 unwatch();
             }
 
-            this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
-            this.watchRequest = watchStub.watch(this);
             this.path = path;
-            this.watchRequest.onNext(nextRequest());
 
-            List<String> children = clientWrapper.getChildren(path);
+            lock.lock();
+            try {
 
-            /**
-             * caching the current service
-             */
-            if (!children.isEmpty()) {
-                this.urls.addAll(filterChildren(children));
-            }
+                this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+                this.watchRequest = watchStub.watch(this);
+                this.watchRequest.onNext(nextRequest());
 
-            return new ArrayList<>(urls);
+                List<String> children = clientWrapper.getChildren(path);
+                /**
+                 * caching the current service
+                 */
+                if (!children.isEmpty()) {
+                    this.urls.addAll(filterChildren(children));
+                }
+
+                return new ArrayList<>(urls);
+            } finally {
+                lock.unlock();
+            }
         }
 
         private boolean safeUpdate(String service, boolean add) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 /**
                  * If the collection already contains the specified service, do nothing
                  */
                 return add ? this.urls.add(service) : this.urls.remove(service);
+            } finally {
+                lock.unlock();
             }
         }
 
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index e563cc2..8515b61 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -15,24 +15,18 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.dubbo.remoting.etcd.jetcd;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+
 import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
 import io.etcd.jetcd.ClientBuilder;
@@ -45,22 +39,15 @@ import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.options.PutOption;
 import io.grpc.ConnectivityState;
 import io.grpc.ManagedChannel;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import io.grpc.util.RoundRobinLoadBalancerFactory;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ConcurrentHashSet;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.etcd.RetryPolicy;
-import org.apache.dubbo.remoting.etcd.StateListener;
-import org.apache.dubbo.remoting.etcd.option.Constants;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -346,14 +333,15 @@ public class JEtcdClientWrapper {
                         public Long call() throws Exception {
                             requiredNotNull(client, failed);
 
-                            keepAlive();
                             registeredPaths.add(path);
+                            keepAlive();
+                            final long leaseId = globalLeaseId;
                             client.getKVClient()
                                     .put(ByteSequence.from(path, UTF_8)
-                                            , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8)
-                                            , PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+                                            , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+                                            , PutOption.newBuilder().withLeaseId(leaseId).build())
                                     .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
-                            return globalLeaseId;
+                            return leaseId;
                         }
                     }, retryPolicy);
         } catch (Exception e) {
@@ -426,27 +414,28 @@ public class JEtcdClientWrapper {
              * causing the extreme scene service to be dropped.
              *
              */
+            long leaseId = globalLeaseId;
             try {
                 if (logger.isWarnEnabled()) {
-                    logger.warn("Failed to keep alive for global lease, waiting for retry again.");
+                    logger.warn("Failed to keep alive for global lease '" + leaseId + "', waiting for retry again.");
                 }
                 onFailed.accept(null);
             } catch (Exception ignored) {
-                logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored);
+                logger.warn("Failed to recover from global lease expired or lease deadline exceeded. lease '" + leaseId + "'", ignored);
             }
         }
     }
 
     private void recovery() {
 
-        /**
-         * The client is processing reconnection
-         */
-        if (cancelKeepAlive) return;
+        try {
+            /**
+             * The client is processing reconnection
+             */
+            if (cancelKeepAlive) return;
 
-        cancelKeepAlive();
+            cancelKeepAlive();
 
-        try {
             Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
             if (!ephemeralPaths.isEmpty()) {
                 for (String path : ephemeralPaths) {
@@ -460,11 +449,17 @@ public class JEtcdClientWrapper {
 
                         createEphemeral(path);
                         failedRegistered.remove(path);
-                    } catch (Exception ignored) {
+                    } catch (Exception e) {
+
                         /**
                          * waiting for retry again
                          */
                         failedRegistered.add(path);
+
+                        Status status = Status.fromThrowable(e);
+                        if (status.getCode() == Status.Code.NOT_FOUND) {
+                            cancelKeepAlive();
+                        }
                     }
                 }
             }
@@ -499,12 +494,13 @@ public class JEtcdClientWrapper {
 
     public String[] endPoints(String backupAddress) {
         String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
-        return Arrays.stream(endpoints)
+        List<String> addressess = Arrays.stream(endpoints)
                 .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
                         ? address
                         : Constants.HTTP_KEY + address)
-                .collect(toList())
-                .toArray(new String[0]);
+                .collect(toList());
+        Collections.shuffle(addressess);
+        return addressess.toArray(new String[0]);
     }
 
     /**
@@ -538,11 +534,14 @@ public class JEtcdClientWrapper {
                         if (connectState != connected) {
                             int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
                             if (connectionStateListener != null) {
-                                if (connected) {
-                                    clearKeepAlive();
+                                try {
+                                    if (connected) {
+                                        clearKeepAlive();
+                                    }
+                                    connectionStateListener.stateChanged(getClient(), notifyState);
+                                } finally {
+                                    cancelKeepAlive = false;
                                 }
-                                connectionStateListener.stateChanged(getClient(), notifyState);
-                                cancelKeepAlive = false;
                             }
                             connectState = connected;
                         }
@@ -566,9 +565,8 @@ public class JEtcdClientWrapper {
         }
     }
 
-    private synchronized void clearKeepAlive() {
+    private void clearKeepAlive() {
         cancelKeepAlive = true;
-        registeredPaths.clear();
         failedRegistered.clear();
         cancelKeepAlive();
     }
@@ -662,8 +660,16 @@ public class JEtcdClientWrapper {
 
                             createEphemeral(path);
                             failedRegistered.remove(path);
-                        } catch (Throwable t) {
-                            logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t);
+                        } catch (Throwable e) {
+
+                            failedRegistered.add(path);
+
+                            Status status = Status.fromThrowable(e);
+                            if (status.getCode() == Status.Code.NOT_FOUND) {
+                                cancelKeepAlive();
+                            }
+
+                            logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + e.getMessage(), e);
                         }
                     }
                 } catch (Throwable t) {