You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/08/16 09:55:21 UTC
[dubbo] 03/06: etcd test
This is an automated email from the ASF dual-hosted git repository.
victory pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 790de7467e40c12cf4bff209a2218e16841fbc34
Author: cvictory <sh...@gmail.com>
AuthorDate: Fri Aug 16 15:19:52 2019 +0800
etcd test
---
.../EtcdDubboServiceConsumerBootstrap.java | 1 +
dubbo-dependencies-bom/pom.xml | 5 +
...g.apache.dubbo.registry.client.ServiceDiscovery | 2 +-
dubbo-remoting/dubbo-remoting-etcd3/pom.xml | 15 +++
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 2 +-
.../dubbo/remoting/etcd/jetcd/LeaseTest.java | 122 +++++++++++++++++++++
6 files changed, 145 insertions(+), 2 deletions(-)
diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
index db4e152..7b0fc47 100644
--- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
+++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
@@ -33,6 +33,7 @@ public class EtcdDubboServiceConsumerBootstrap {
new DubboBootstrap()
.application("dubbo-consumer-demo")
// Zookeeper
+ .protocol(builder -> builder.port(20887).name("dubbo"))
.registry("zookeeper", builder -> builder.address("etcd3://127.0.0.1:2379?registry.type=service&subscribed.services=dubbo-provider-demo"))
.metadataReport(new MetadataReportConfig("etcd://127.0.0.1:2379"))
// Nacos
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 35c808f..ea55a65 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -428,6 +428,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <version>${jetcd_version}</version>
+ </dependency>
<!-- Log libs -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
index 60f47ac..804f787 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -1 +1 @@
-etcd=org.apache.dubbo.registry.etcd.EtcdServiceDiscovery
\ No newline at end of file
+etcd3=org.apache.dubbo.registry.etcd.EtcdServiceDiscovery
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
index 285854b..266e3f6 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
@@ -32,6 +32,7 @@
<description>The etcd3 remoting module of Dubbo project</description>
<properties>
<skip_maven_deploy>false</skip_maven_deploy>
+ <assertj.version>3.13.2</assertj.version>
</properties>
<dependencies>
<dependency>
@@ -48,10 +49,24 @@
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
</dependency>
+ <!-- https://mvnrepository.com/artifact/io.etcd/jetcd-launcher -->
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index d4512d4..784b5a0 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -205,7 +205,7 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
@Override
public boolean putEphemeral(String key, String value) {
- return clientWrapper.put(key, value);
+ return clientWrapper.putEphemeral(key, value);
}
public ManagedChannel getChannel() {
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
new file mode 100644
index 0000000..2e0d5af
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
@@ -0,0 +1,122 @@
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.common.base.Charsets;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Observers;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.PutOption;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author cvictory ON 2019-08-16
+ */
+public class LeaseTest {
+
+ private static EtcdCluster cluster;
+
+ private KV kvClient;
+ private Client client;
+ private Lease leaseClient;
+
+ private static final ByteSequence KEY = ByteSequence.from("foo", Charsets.UTF_8);
+ private static final ByteSequence KEY_2 = ByteSequence.from("foo2", Charsets.UTF_8);
+ private static final ByteSequence VALUE = ByteSequence.from("bar", Charsets.UTF_8);
+
+ @BeforeAll
+ public static void beforeClass() {
+ cluster = EtcdClusterFactory.buildCluster("etcd-lease", 3, false);
+ cluster.start();
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ cluster.close();
+ }
+
+ @BeforeEach
+ public void setUp() {
+ client = Client.builder().endpoints(cluster.getClientEndpoints()).build();
+ kvClient = client.getKVClient();
+ leaseClient = client.getLeaseClient();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (client != null) {
+ client.close();
+ }
+
+ }
+
+ @Test
+ public void testGrant() throws Exception {
+ long leaseID = leaseClient.grant(5).get().getID();
+
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+ Thread.sleep(6000);
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testRevoke() throws Exception {
+ long leaseID = leaseClient.grant(5).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+ leaseClient.revoke(leaseID).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testKeepAliveOnce() throws ExecutionException, InterruptedException {
+ long leaseID = leaseClient.grant(2).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+ LeaseKeepAliveResponse rp = leaseClient.keepAliveOnce(leaseID).get();
+ assertThat(rp.getTTL()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testKeepAlive() throws ExecutionException, InterruptedException {
+ long leaseID = leaseClient.grant(2).get().getID();
+ kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<LeaseKeepAliveResponse> responseRef = new AtomicReference<>();
+ StreamObserver<LeaseKeepAliveResponse> observer = Observers.observer(response -> {
+ responseRef.set(response);
+ latch.countDown();
+ });
+
+ try (CloseableClient c = leaseClient.keepAlive(leaseID, observer)) {
+ latch.await(5, TimeUnit.SECONDS);
+ LeaseKeepAliveResponse response = responseRef.get();
+ assertThat(response.getTTL()).isGreaterThan(0);
+ }
+
+ Thread.sleep(3000);
+ assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+ }
+
+}