You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by il...@apache.org on 2019/03/16 05:12:55 UTC
[incubator-dubbo] branch 2.7.1-release updated: [Dubbo-3653] etcd
as config center (#3663)
This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch 2.7.1-release
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/2.7.1-release by this push:
new cf23d61 [Dubbo-3653] etcd as config center (#3663)
cf23d61 is described below
commit cf23d612f9ba3f1c508071b420260eb695a787d8
Author: Huxing Zhang <hu...@gmail.com>
AuthorDate: Sat Mar 16 13:11:33 2019 +0800
[Dubbo-3653] etcd as config center (#3663)
* Minor refactor, no functinoal change.
* Separate ConnectionStateListener
* Simplify code
* Fix typo
* Support get external config from etcd config center
* Polish diamond operator
* Initial etcd support as config center
* Add a put interface for JEtcdClient
* Enhanced Etcd config center support with the ability to watch and cancel watch
* Polish code
* Distinguish modification event and delete event
* Add etcd registry and configcenter to dubbo-all
* Watch again when connection is re-established
---
dubbo-all/pom.xml | 16 ++
dubbo-bom/pom.xml | 5 +
dubbo-configcenter/dubbo-configcenter-etcd/pom.xml | 46 ++++
.../support/etcd/EtcdDynamicConfiguration.java | 187 ++++++++++++++
.../etcd/EtcdDynamicConfigurationFactory.java | 33 +++
....dubbo.configcenter.DynamicConfigurationFactory | 1 +
.../support/etcd/EtcdDynamicConfigurationTest.java | 141 +++++++++++
dubbo-configcenter/pom.xml | 1 +
.../apache/dubbo/registry/etcd/EtcdRegistry.java | 18 +-
.../org/apache/dubbo/remoting/etcd/EtcdClient.java | 16 ++
.../etcd/jetcd/ConnectionStateListener.java | 31 +++
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 22 +-
.../remoting/etcd/jetcd/JEtcdClientWrapper.java | 282 +++++++++++----------
.../remoting/etcd/support/AbstractEtcdClient.java | 4 +-
.../dubbo/remoting/etcd/jetcd/JEtcdClientTest.java | 166 ++++++++++++
15 files changed, 811 insertions(+), 158 deletions(-)
diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml
index edb0f36..a4a0b5c 100644
--- a/dubbo-all/pom.xml
+++ b/dubbo-all/pom.xml
@@ -243,6 +243,13 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-etcd3</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
@@ -362,6 +369,13 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-configcenter-etcd</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compatible</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
@@ -494,6 +508,7 @@
<include>org.apache.dubbo:dubbo-registry-zookeeper</include>
<include>org.apache.dubbo:dubbo-registry-redis</include>
<include>org.apache.dubbo:dubbo-registry-consul</include>
+ <include>org.apache.dubbo:dubbo-registry-etcd3</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-config-api</include>
@@ -515,6 +530,7 @@
<include>org.apache.dubbo:dubbo-configcenter-apollo</include>
<include>org.apache.dubbo:dubbo-configcenter-zookeeper</include>
<include>org.apache.dubbo:dubbo-configcenter-consul</include>
+ <include>org.apache.dubbo:dubbo-configcenter-etcd</include>
<include>org.apache.dubbo:dubbo-metadata-report-api</include>
<include>org.apache.dubbo:dubbo-metadata-definition</include>
<include>org.apache.dubbo:dubbo-metadata-report-redis</include>
diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml
index d04495a..47875b8 100644
--- a/dubbo-bom/pom.xml
+++ b/dubbo-bom/pom.xml
@@ -345,6 +345,11 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-configcenter-etcd</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-definition</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
new file mode 100644
index 0000000..60efc8e
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-configcenter</artifactId>
+ <groupId>org.apache.dubbo</groupId>
+ <version>2.7.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-configcenter-etcd</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd implementation of the config-center api</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-configcenter-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
new file mode 100644
index 0000000..18e9088
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigChangeType;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
+import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;
+
+/**
+ * The etcd implementation of {@link DynamicConfiguration}
+ */
+public class EtcdDynamicConfiguration implements DynamicConfiguration {
+
+ /**
+ * The final root path would be: /$NAME_SPACE/config
+ */
+ private String rootPath;
+
+ /**
+ * The etcd client
+ */
+ private final JEtcdClient etcdClient;
+
+ /**
+ * The map store the key to {@link EtcdConfigWatcher} mapping
+ */
+ private final ConcurrentMap<ConfigurationListener, EtcdConfigWatcher> watchListenerMap;
+
+ EtcdDynamicConfiguration(URL url) {
+ rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
+ etcdClient = new JEtcdClient(url);
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ watchListenerMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void addListener(String key, String group, ConfigurationListener listener) {
+ if (watchListenerMap.get(listener) == null) {
+ String normalizedKey = convertKey(key);
+ EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener);
+ watchListenerMap.put(listener, watcher);
+ watcher.watch();
+ }
+ }
+
+ @Override
+ public void removeListener(String key, String group, ConfigurationListener listener) {
+ EtcdConfigWatcher watcher = watchListenerMap.get(listener);
+ watcher.cancelWatch();
+ }
+
+ // TODO Abstract the logic into super class
+ @Override
+ public String getConfig(String key, String group, long timeout) throws IllegalStateException {
+ if (StringUtils.isNotEmpty(group)) {
+ key = group + PATH_SEPARATOR + key;
+ } else {
+ int i = key.lastIndexOf(".");
+ key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
+ }
+ return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
+ }
+
+ @Override
+ public Object getInternalProperty(String key) {
+ return etcdClient.getKVValue(key);
+ }
+
+
+ private String convertKey(String key) {
+ int index = key.lastIndexOf('.');
+ return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
+ }
+
+ private void recover() {
+ for (EtcdConfigWatcher watcher: watchListenerMap.values()) {
+ watcher.watch();
+ }
+ }
+
+ public class EtcdConfigWatcher implements StreamObserver<WatchResponse> {
+
+ private ConfigurationListener listener;
+ protected WatchGrpc.WatchStub watchStub;
+ private StreamObserver<WatchRequest> observer;
+ protected long watchId;
+ private ManagedChannel channel;
+ private String key;
+
+ public EtcdConfigWatcher(String key, ConfigurationListener listener) {
+ this.key = key;
+ this.listener = listener;
+ this.channel = etcdClient.getChannel();
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ this.watchId = watchResponse.getWatchId();
+ for (Event etcdEvent : watchResponse.getEventsList()) {
+ ConfigChangeType type = ConfigChangeType.MODIFIED;
+ if (etcdEvent.getType() == Event.EventType.DELETE) {
+ type = ConfigChangeType.DELETED;
+ }
+ ConfigChangeEvent event = new ConfigChangeEvent(
+ etcdEvent.getKv().getKey().toString(UTF_8),
+ etcdEvent.getKv().getValue().toString(UTF_8), type);
+ listener.process(event);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // ignore
+ }
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+
+ public long getWatchId() {
+ return watchId;
+ }
+
+ private void watch() {
+ watchStub = WatchGrpc.newStub(channel);
+ observer = watchStub.watch(this);
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8(key))
+ .setProgressNotify(true);
+ WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build();
+ observer.onNext(req);
+ }
+
+ private void cancelWatch() {
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+ }
+ }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
new file mode 100644
index 0000000..02e91a6
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+
+/**
+ * The etcd implementation of {@link AbstractDynamicConfigurationFactory}
+ */
+public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
+
+ @Override
+ protected DynamicConfiguration createDynamicConfiguration(URL url) {
+ return new EtcdDynamicConfiguration(url);
+ }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
new file mode 100644
index 0000000..d84b1ae
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
new file mode 100644
index 0000000..87143fd
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Unit test for etcd config center support
+ * TODO Integrate with https://github.com/etcd-io/jetcd#launcher or using mock data.
+ */
+@Disabled
+public class EtcdDynamicConfigurationTest {
+
+ private static final String ENDPOINT = "http://127.0.0.1:2379";
+
+ private static EtcdDynamicConfiguration config;
+
+ private static Client etcdClient;
+
+ @Test
+ public void testGetConfig() {
+ put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators", "hello");
+ put("/dubbo/config/test/dubbo.properties", "aaa=bbb");
+ Assertions.assertEquals("hello", config.getConfig("org.apache.dubbo.etcd.testService.configurators"));
+ Assertions.assertEquals("aaa=bbb", config.getConfig("dubbo.properties", "test"));
+ }
+
+ @Test
+ public void testAddListener() throws Exception {
+ CountDownLatch latch = new CountDownLatch(4);
+ TestListener listener1 = new TestListener(latch);
+ TestListener listener2 = new TestListener(latch);
+ TestListener listener3 = new TestListener(latch);
+ TestListener listener4 = new TestListener(latch);
+ config.addListener("AService.configurators", listener1);
+ config.addListener("AService.configurators", listener2);
+ config.addListener("testapp.tagrouters", listener3);
+ config.addListener("testapp.tagrouters", listener4);
+
+ put("/dubbo/config/AService/configurators", "new value1");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp/tagrouters", "new value2");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp", "new value3");
+
+ Thread.sleep(1000);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Assertions.assertEquals(1, listener1.getCount("/dubbo/config/AService/configurators"));
+ Assertions.assertEquals(1, listener2.getCount("/dubbo/config/AService/configurators"));
+ Assertions.assertEquals(1, listener3.getCount("/dubbo/config/testapp/tagrouters"));
+ Assertions.assertEquals(1, listener4.getCount("/dubbo/config/testapp/tagrouters"));
+
+ Assertions.assertEquals("new value1", listener1.getValue());
+ Assertions.assertEquals("new value1", listener2.getValue());
+ Assertions.assertEquals("new value2", listener3.getValue());
+ Assertions.assertEquals("new value2", listener4.getValue());
+ }
+
+ private class TestListener implements ConfigurationListener {
+ private CountDownLatch latch;
+ private String value;
+ private Map<String, Integer> countMap = new HashMap<>();
+
+ public TestListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void process(ConfigChangeEvent event) {
+ Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0);
+ countMap.put(event.getKey(), ++count);
+ value = event.getValue();
+ latch.countDown();
+ }
+
+ public int getCount(String key) {
+ return countMap.get(key);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ static void put(String key, String value) {
+ try {
+ etcdClient.getKVClient()
+ .put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8))
+ .get();
+ } catch (Exception e) {
+ System.out.println("Error put value to etcd.");
+ }
+ }
+
+ @BeforeAll
+ static void setUp() {
+ etcdClient = Client.builder().endpoints(ENDPOINT).build();
+ // timeout in 15 seconds.
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.etcd.testService")
+ .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000);
+ config = new EtcdDynamicConfiguration(url);
+ }
+
+ @AfterAll
+ static void tearDown() {
+ etcdClient.close();
+ }
+}
diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml
index fa703be..92f727d 100644
--- a/dubbo-configcenter/pom.xml
+++ b/dubbo-configcenter/pom.xml
@@ -34,5 +34,6 @@
<module>dubbo-configcenter-zookeeper</module>
<module>dubbo-configcenter-apollo</module>
<module>dubbo-configcenter-consul</module>
+ <module>dubbo-configcenter-etcd</module>
</modules>
</project>
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
index 504d521..f0d9406 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -71,7 +71,7 @@ public class EtcdRegistry extends FailbackRegistry {
private final Set<String> anyServices = new ConcurrentHashSet<String>();
- private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
+ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
private final EtcdClient etcdClient;
private long expirePeriod;
@@ -86,14 +86,12 @@ public class EtcdRegistry extends FailbackRegistry {
}
this.root = group;
etcdClient = etcdTransporter.connect(url);
- etcdClient.addStateListener(new StateListener() {
- public void stateChanged(int state) {
- if (state == CONNECTED) {
- try {
- recover();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
}
}
});
@@ -345,7 +343,7 @@ public class EtcdRegistry extends FailbackRegistry {
}
protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
- List<URL> urls = new ArrayList<URL>();
+ List<URL> urls = new ArrayList<>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
index b1e765d..286be93 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -164,4 +164,20 @@ public interface EtcdClient {
*/
void revokeLease(long lease);
+
+ /**
+ * Get the value of the specified key.
+ * @param key the specified key
+ * @return null if the value is not found
+ */
+ String getKVValue(String key);
+
+ /**
+ * Put the key value pair to etcd
+ * @param key the specified key
+ * @param value the paired value
+ * @return true if put success
+ */
+ boolean put(String key, String value);
+
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
new file mode 100644
index 0000000..788aa40
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.Client;
+
+public interface ConnectionStateListener {
+
+ /**
+ * Called when there is a state change in the connection
+ *
+ * @param client the client
+ * @param newState the new state
+ */
+ void stateChanged(Client client, int newState);
+}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index d07cad0..ff4c118 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.etcd.jetcd;
+import io.grpc.ManagedChannel;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
@@ -185,6 +186,20 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
}
}
+ @Override
+ public String getKVValue(String key) {
+ return clientWrapper.getKVValue(key);
+ }
+
+ @Override
+ public boolean put(String key, String value) {
+ return clientWrapper.put(key, value);
+ }
+
+ public ManagedChannel getChannel() {
+ return clientWrapper.getChannel();
+ }
+
public class EtcdWatcher implements StreamObserver<WatchResponse> {
protected WatchGrpc.WatchStub watchStub;
@@ -233,12 +248,7 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
}
}
if (modified > 0) {
- notifyExecutor.execute(new Runnable() {
- @Override
- public void run() {
- listener.childChanged(path, new ArrayList<>(urls));
- }
- });
+ notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
}
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index 8515b61..c7f472d 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.etcd.jetcd;
+import io.etcd.jetcd.kv.PutResponse;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -31,9 +32,11 @@ import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Observers;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
@@ -87,7 +90,8 @@ public class JEtcdClientWrapper {
private RuntimeException failed;
private final ScheduledFuture<?> retryFuture;
- private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+ private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
private final Set<String> failedRegistered = new ConcurrentHashSet<String>();
@@ -117,28 +121,26 @@ public class JEtcdClientWrapper {
this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
- this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- try {
- retry();
- } catch (Throwable t) {
- logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
- }
+ this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ retry();
+ } catch (Throwable t) {
+ logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
private Client prepareClient(URL url) {
- int maxInboudSize = DEFAULT_INBOUT_SIZE;
- if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) {
- maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY));
+ int maxInboundSize = DEFAULT_INBOUND_SIZE;
+ if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) {
+ maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY));
}
ClientBuilder clientBuilder = Client.builder()
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.endpoints(endPoints(url.getBackupAddress()))
- .maxInboundMessageSize(maxInboudSize);
+ .maxInboundMessageSize(maxInboundSize);
return clientBuilder.build();
}
@@ -170,29 +172,26 @@ public class JEtcdClientWrapper {
public List<String> getChildren(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable<List<String>>() {
- @Override
- public List<String> call() throws Exception {
- requiredNotNull(client, failed);
- int len = path.length();
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8),
- GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getKvs().stream().parallel()
- .filter(pair -> {
- String key = pair.getKey().toString(UTF_8);
- int index = len, count = 0;
- if (key.length() > len) {
- for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
- if (count++ > 1) break;
- }
+ () -> {
+ requiredNotNull(client, failed);
+ int len = path.length();
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8),
+ GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream().parallel()
+ .filter(pair -> {
+ String key = pair.getKey().toString(UTF_8);
+ int index = len, count = 0;
+ if (key.length() > len) {
+ for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) break;
}
- return count == 1;
- })
- .map(pair -> pair.getKey().toString(UTF_8))
- .collect(toList());
- }
+ }
+ return count == 1;
+ })
+ .map(pair -> pair.getKey().toString(UTF_8))
+ .collect(toList());
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -207,15 +206,12 @@ public class JEtcdClientWrapper {
public long createLease(long second) {
try {
return RetryLoops.invokeWithRetry(
- new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
- return client.getLeaseClient()
- .grant(second)
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getID();
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(second)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -225,15 +221,12 @@ public class JEtcdClientWrapper {
public void revokeLease(long lease) {
try {
RetryLoops.invokeWithRetry(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getLeaseClient()
- .revoke(lease)
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return null;
- }
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getLeaseClient()
+ .revoke(lease)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -260,15 +253,12 @@ public class JEtcdClientWrapper {
public boolean checkExists(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- requiredNotNull(client, failed);
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getCount() > 0;
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getCount() > 0;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -281,17 +271,14 @@ public class JEtcdClientWrapper {
protected Long find(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getKvs().stream()
- .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
- .findFirst().getAsLong();
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream()
+ .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+ .findFirst().getAsLong();
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -301,16 +288,13 @@ public class JEtcdClientWrapper {
public void createPersistent(String path) {
try {
RetryLoops.invokeWithRetry(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getKVClient()
- .put(ByteSequence.from(path, UTF_8),
- ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return null;
- }
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8),
+ ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -328,21 +312,18 @@ public class JEtcdClientWrapper {
public long createEphemeral(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
-
- registeredPaths.add(path);
- keepAlive();
- final long leaseId = globalLeaseId;
- client.getKVClient()
- .put(ByteSequence.from(path, UTF_8)
- , ByteSequence.from(String.valueOf(leaseId), UTF_8)
- , PutOption.newBuilder().withLeaseId(leaseId).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return leaseId;
- }
+ () -> {
+ requiredNotNull(client, failed);
+
+ registeredPaths.add(path);
+ keepAlive();
+ final long leaseId = globalLeaseId;
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8)
+ , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return leaseId;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -354,6 +335,7 @@ public class JEtcdClientWrapper {
this.keepAlive(lease, null);
}
+ @SuppressWarnings("unchecked")
private <T> void keepAlive(long lease, Consumer<T> onFailed) {
final StreamObserver<LeaseKeepAliveResponse> observer = new Observers.Builder()
.onError((e) -> {
@@ -471,16 +453,13 @@ public class JEtcdClientWrapper {
public void delete(String path) {
try {
RetryLoops.invokeWithRetry(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getKVClient()
- .delete(ByteSequence.from(path, UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- registeredPaths.remove(path);
- return null;
- }
+ (Callable<Void>) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .delete(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ registeredPaths.remove(path);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -494,13 +473,13 @@ public class JEtcdClientWrapper {
public String[] endPoints(String backupAddress) {
String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
- List<String> addressess = Arrays.stream(endpoints)
- .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
+ List<String> addresses = Arrays.stream(endpoints)
+ .map(address -> address.contains(Constants.HTTP_SUBFIX_KEY)
? address
: Constants.HTTP_KEY + address)
.collect(toList());
- Collections.shuffle(addressess);
- return addressess.toArray(new String[0]);
+ Collections.shuffle(addresses);
+ return addresses.toArray(new String[0]);
}
/**
@@ -527,26 +506,22 @@ public class JEtcdClientWrapper {
}
try {
- this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- boolean connected = isConnected();
- if (connectState != connected) {
- int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
- if (connectionStateListener != null) {
- try {
- if (connected) {
- clearKeepAlive();
- }
- connectionStateListener.stateChanged(getClient(), notifyState);
- } finally {
- cancelKeepAlive = false;
+ this.future = reconnectNotify.scheduleWithFixedDelay(() -> {
+ boolean connected = isConnected();
+ if (connectState != connected) {
+ int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+ if (connectionStateListener != null) {
+ try {
+ if (connected) {
+ clearKeepAlive();
}
+ connectionStateListener.stateChanged(getClient(), notifyState);
+ } finally {
+ cancelKeepAlive = false;
}
- connectState = connected;
}
+ connectState = connected;
}
-
}, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("monitor reconnect status failed.", t);
@@ -575,7 +550,9 @@ public class JEtcdClientWrapper {
try {
cancelKeepAlive = true;
- revokeLease(this.globalLeaseId);
+ if (globalLeaseId > 0) {
+ revokeLease(this.globalLeaseId);
+ }
} catch (Exception e) {
logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
}
@@ -638,6 +615,41 @@ public class JEtcdClientWrapper {
}
}
+ public String getKVValue(String key) {
+ if (null == key) {
+ return null;
+ }
+
+ CompletableFuture<GetResponse> responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8));
+
+ try {
+ List<KeyValue> result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs();
+ if (!result.isEmpty()) {
+ return result.get(0).getValue().toString(UTF_8);
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ return null;
+ }
+
+
+ public boolean put(String key, String value) {
+ if (key == null || value == null) {
+ return false;
+ }
+ CompletableFuture<PutResponse> putFuture =
+ this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8));
+ try {
+ putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (Exception e) {
+ // ignore
+ }
+ return false;
+ }
+
private void retry() {
if (!failedRegistered.isEmpty()) {
Set<String> failed = new HashSet<String>(failedRegistered);
@@ -679,24 +691,14 @@ public class JEtcdClientWrapper {
}
}
- public interface ConnectionStateListener {
- /**
- * Called when there is a state change in the connection
- *
- * @param client the client
- * @param newState the new state
- */
- public void stateChanged(Client client, int newState);
- }
-
/**
* default request timeout
*/
public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
- public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024;
+ public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024;
- public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size";
+ public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size";
public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
index 31752bf..5fecd14 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -57,7 +57,7 @@ public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient
private final Set<StateListener> stateListeners = new ConcurrentHashSet<>();
- private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, WatcherListener>>();
+ private final ConcurrentMap<String, ConcurrentMap<ChildListener, WatcherListener>> childListeners = new ConcurrentHashMap<>();
private final List<String> categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY
, Constants.CONSUMERS_CATEGORY
, Constants.ROUTERS_CATEGORY
@@ -99,7 +99,7 @@ public abstract class AbstractEtcdClient<WatcherListener> implements EtcdClient
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, WatcherListener> listeners = childListeners.get(path);
if (listeners == null) {
- childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, WatcherListener>());
+ childListeners.putIfAbsent(path, new ConcurrentHashMap<>());
listeners = childListeners.get(path);
}
WatcherListener targetListener = listeners.get(listener);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
index 19254ab..9674fee 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -33,8 +33,21 @@
*/
package org.apache.dubbo.remoting.etcd.jetcd;
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.ManagedChannel;
import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.etcd.ChildListener;
@@ -44,8 +57,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
@Disabled
public class JEtcdClientTest {
@@ -76,6 +94,139 @@ public class JEtcdClientTest {
}
@Test
+ public void test_watch_when_modify() {
+ String path = "/dubbo/config/jetcd-client-unit-test/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ ByteSequence key = ByteSequence.from(path, UTF_8);
+
+ Watch.Listener listener = Watch.listener(response -> {
+ for (WatchEvent event : response.getEvents()) {
+ Assertions.assertEquals("PUT", event.getEventType().toString());
+ Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+
+ });
+
+ try (Client client = Client.builder().endpoints(endpoint).build();
+ Watch watch = client.getWatchClient();
+ Watch.Watcher watcher = watch.watch(key, listener)) {
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWatchWithGrpc() {
+ String path = "/dubbo/config/test_watch_with_grpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelWatchWithGrpc() {
+ String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch updateLatch = new CountDownLatch(1);
+ CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicLong watchID = new AtomicLong(-1L);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ watchID.set(response.getWatchId());
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ updateLatch.countDown();
+ }
+ if (response.getCanceled()) {
+ // received the cancel response
+ cancelLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ // create
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ // make the grpc call to watch the key
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+
+ // response received, latch counts down to zero
+ updateLatch.await();
+
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
+
+ cancelLatch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+
+ }
+
+ @Test
public void test_watch_when_create_wrong_path() throws InterruptedException {
String path = "/dubbo/com.alibaba.dubbo.demo.DemoService/providers";
@@ -257,4 +408,19 @@ public class JEtcdClientTest {
return ++value;
}
}
+
+ private ManagedChannel getChannel(Client client) {
+ try {
+ // hack, use reflection to get the shared channel.
+ Field connectionField = client.getClass().getDeclaredField("connectionManager");
+ connectionField.setAccessible(true);
+ Object connection = connectionField.get(client);
+ Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
+ channelMethod.setAccessible(true);
+ ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
+ return channel;
+ } catch (Exception e) {
+ return null;
+ }
+ }
}