You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/11 14:29:57 UTC
[incubator-dubbo] branch master updated: Fix some etcd3 registry
bugs. (#3632)
This is an automated email from the ASF dual-hosted git repository.
huxing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new fed47bf Fix some etcd3 registry bugs. (#3632)
fed47bf is described below
commit fed47bf0a864f0922c5086e3d10b37acec563f3d
Author: yì jí <yi...@apache.org>
AuthorDate: Mon Mar 11 22:29:15 2019 +0800
Fix some etcd3 registry bugs. (#3632)
* fix some bugs.
* fix typo
* cancel keep alive if recovery failed.
* remove duplicate license header.
---
.../java/org/apache/dubbo/common/Constants.java | 11 +-
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 113 +++++++++++++--------
.../remoting/etcd/jetcd/JEtcdClientWrapper.java | 104 ++++++++++---------
3 files changed, 131 insertions(+), 97 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index aa6ae05..e54fcf8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -831,8 +831,13 @@ public class Constants {
* Production environment key.
*/
public static final String PRODUCTION_ENVIRONMENT = "product";
- /*
- * private Constants(){ }
- */
+
+ public static final String ETCD3_NOTIFY_MAXTHREADS_KEYS = "etcd3.notify.maxthreads";
+
+ public static final int DEFAULT_ETCD3_NOTIFY_THREADS = DEFAULT_IO_THREADS;
+
+ public static final String DEFAULT_ETCD3_NOTIFY_QUEUES_KEY = "etcd3.notify.queues";
+
+ public static final int DEFAULT_GRPC_QUEUES = 300_0000;
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index 979caee..d07cad0 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -15,24 +15,19 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.dubbo.remoting.etcd.jetcd;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ExecutorUtil;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
+
import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.api.Event;
@@ -46,15 +41,6 @@ import io.etcd.jetcd.common.exception.ClosedClientException;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.ConcurrentSet;
-import org.apache.dubbo.common.Constants;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.remoting.etcd.ChildListener;
-import org.apache.dubbo.remoting.etcd.StateListener;
-import org.apache.dubbo.remoting.etcd.option.OptionUtil;
-import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient;
import java.util.ArrayList;
import java.util.Collections;
@@ -63,10 +49,14 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
import static java.util.stream.Collectors.toList;
import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8;
@@ -79,6 +69,8 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
private JEtcdClientWrapper clientWrapper;
private ScheduledExecutorService reconnectSchedule;
+ private ExecutorService notifyExecutor;
+
private int delayPeriod;
private Logger logger = LoggerFactory.getLogger(JEtcdClient.class);
@@ -95,7 +87,16 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
});
delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
reconnectSchedule = Executors.newScheduledThreadPool(1,
- new NamedThreadFactory("auto-reconnect"));
+ new NamedThreadFactory("etcd3-watch-auto-reconnect"));
+
+ notifyExecutor = new ThreadPoolExecutor(
+ 1
+ , url.getParameter(Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS, Constants.DEFAULT_ETCD3_NOTIFY_THREADS)
+ , Constants.DEFAULT_SESSION_TIMEOUT
+ , TimeUnit.MILLISECONDS
+ , new LinkedBlockingQueue<Runnable>(url.getParameter(Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, Constants.DEFAULT_GRPC_QUEUES * 3))
+ , new NamedThreadFactory("etcd3-notify", true));
+
clientWrapper.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -166,9 +167,19 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
@Override
public void doClose() {
try {
- reconnectSchedule.shutdownNow();
+ if (notifyExecutor != null) {
+ ExecutorUtil.shutdownNow(notifyExecutor, 100);
+ }
} catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ try {
+ if (reconnectSchedule != null) {
+ ExecutorUtil.shutdownNow(reconnectSchedule, 100);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
} finally {
clientWrapper.doClose();
}
@@ -181,9 +192,11 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
protected long watchId;
protected String path;
protected Throwable throwable;
- protected Set<String> urls = new ConcurrentSet<>();
+ protected volatile Set<String> urls = new ConcurrentSet<>();
private ChildListener listener;
+ protected ReentrantLock lock = new ReentrantLock(true);
+
public EtcdWatcher(ChildListener listener) {
this.listener = listener;
}
@@ -220,7 +233,12 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
}
}
if (modified > 0) {
- listener.childChanged(path, new ArrayList<>(urls));
+ notifyExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ listener.childChanged(path, new ArrayList<>(urls));
+ }
+ });
}
}
@@ -257,37 +275,42 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
if (!isConnected()) {
throw new ClosedClientException("watch client has been closed, path '" + path + "'");
}
-
if (this.path != null) {
- if (this.path.equals(path)) {
- return clientWrapper.getChildren(path);
- }
unwatch();
}
- this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
- this.watchRequest = watchStub.watch(this);
this.path = path;
- this.watchRequest.onNext(nextRequest());
- List<String> children = clientWrapper.getChildren(path);
+ lock.lock();
+ try {
- /**
- * caching the current service
- */
- if (!children.isEmpty()) {
- this.urls.addAll(filterChildren(children));
- }
+ this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel());
+ this.watchRequest = watchStub.watch(this);
+ this.watchRequest.onNext(nextRequest());
- return new ArrayList<>(urls);
+ List<String> children = clientWrapper.getChildren(path);
+ /**
+ * caching the current service
+ */
+ if (!children.isEmpty()) {
+ this.urls.addAll(filterChildren(children));
+ }
+
+ return new ArrayList<>(urls);
+ } finally {
+ lock.unlock();
+ }
}
private boolean safeUpdate(String service, boolean add) {
- synchronized (this) {
+ lock.lock();
+ try {
/**
* If the collection already contains the specified service, do nothing
*/
return add ? this.urls.add(service) : this.urls.remove(service);
+ } finally {
+ lock.unlock();
}
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index e563cc2..8515b61 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -15,24 +15,18 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.dubbo.remoting.etcd.jetcd;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.RetryPolicy;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.Constants;
+
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
@@ -45,22 +39,15 @@ import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.util.RoundRobinLoadBalancerFactory;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ConcurrentHashSet;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.etcd.RetryPolicy;
-import org.apache.dubbo.remoting.etcd.StateListener;
-import org.apache.dubbo.remoting.etcd.option.Constants;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -346,14 +333,15 @@ public class JEtcdClientWrapper {
public Long call() throws Exception {
requiredNotNull(client, failed);
- keepAlive();
registeredPaths.add(path);
+ keepAlive();
+ final long leaseId = globalLeaseId;
client.getKVClient()
.put(ByteSequence.from(path, UTF_8)
- , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8)
- , PutOption.newBuilder().withLeaseId(globalLeaseId).build())
+ , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return globalLeaseId;
+ return leaseId;
}
}, retryPolicy);
} catch (Exception e) {
@@ -426,27 +414,28 @@ public class JEtcdClientWrapper {
* causing the extreme scene service to be dropped.
*
*/
+ long leaseId = globalLeaseId;
try {
if (logger.isWarnEnabled()) {
- logger.warn("Failed to keep alive for global lease, waiting for retry again.");
+ logger.warn("Failed to keep alive for global lease '" + leaseId + "', waiting for retry again.");
}
onFailed.accept(null);
} catch (Exception ignored) {
- logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored);
+ logger.warn("Failed to recover from global lease expired or lease deadline exceeded. lease '" + leaseId + "'", ignored);
}
}
}
private void recovery() {
- /**
- * The client is processing reconnection
- */
- if (cancelKeepAlive) return;
+ try {
+ /**
+ * The client is processing reconnection
+ */
+ if (cancelKeepAlive) return;
- cancelKeepAlive();
+ cancelKeepAlive();
- try {
Set<String> ephemeralPaths = new HashSet<String>(registeredPaths);
if (!ephemeralPaths.isEmpty()) {
for (String path : ephemeralPaths) {
@@ -460,11 +449,17 @@ public class JEtcdClientWrapper {
createEphemeral(path);
failedRegistered.remove(path);
- } catch (Exception ignored) {
+ } catch (Exception e) {
+
/**
* waiting for retry again
*/
failedRegistered.add(path);
+
+ Status status = Status.fromThrowable(e);
+ if (status.getCode() == Status.Code.NOT_FOUND) {
+ cancelKeepAlive();
+ }
}
}
}
@@ -499,12 +494,13 @@ public class JEtcdClientWrapper {
public String[] endPoints(String backupAddress) {
String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
- return Arrays.stream(endpoints)
+ List<String> addressess = Arrays.stream(endpoints)
.map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
? address
: Constants.HTTP_KEY + address)
- .collect(toList())
- .toArray(new String[0]);
+ .collect(toList());
+ Collections.shuffle(addressess);
+ return addressess.toArray(new String[0]);
}
/**
@@ -538,11 +534,14 @@ public class JEtcdClientWrapper {
if (connectState != connected) {
int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
if (connectionStateListener != null) {
- if (connected) {
- clearKeepAlive();
+ try {
+ if (connected) {
+ clearKeepAlive();
+ }
+ connectionStateListener.stateChanged(getClient(), notifyState);
+ } finally {
+ cancelKeepAlive = false;
}
- connectionStateListener.stateChanged(getClient(), notifyState);
- cancelKeepAlive = false;
}
connectState = connected;
}
@@ -566,9 +565,8 @@ public class JEtcdClientWrapper {
}
}
- private synchronized void clearKeepAlive() {
+ private void clearKeepAlive() {
cancelKeepAlive = true;
- registeredPaths.clear();
failedRegistered.clear();
cancelKeepAlive();
}
@@ -662,8 +660,16 @@ public class JEtcdClientWrapper {
createEphemeral(path);
failedRegistered.remove(path);
- } catch (Throwable t) {
- logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t);
+ } catch (Throwable e) {
+
+ failedRegistered.add(path);
+
+ Status status = Status.fromThrowable(e);
+ if (status.getCode() == Status.Code.NOT_FOUND) {
+ cancelKeepAlive();
+ }
+
+ logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + e.getMessage(), e);
}
}
} catch (Throwable t) {