You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/08/16 09:55:21 UTC

[dubbo] 03/06: etcd test

This is an automated email from the ASF dual-hosted git repository.

victory pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 790de7467e40c12cf4bff209a2218e16841fbc34
Author: cvictory <sh...@gmail.com>
AuthorDate: Fri Aug 16 15:19:52 2019 +0800

    etcd test
---
 .../EtcdDubboServiceConsumerBootstrap.java         |   1 +
 dubbo-dependencies-bom/pom.xml                     |   5 +
 ...g.apache.dubbo.registry.client.ServiceDiscovery |   2 +-
 dubbo-remoting/dubbo-remoting-etcd3/pom.xml        |  15 +++
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java     |   2 +-
 .../dubbo/remoting/etcd/jetcd/LeaseTest.java       | 122 +++++++++++++++++++++
 6 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
index db4e152..7b0fc47 100644
--- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
+++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/EtcdDubboServiceConsumerBootstrap.java
@@ -33,6 +33,7 @@ public class EtcdDubboServiceConsumerBootstrap {
         new DubboBootstrap()
                 .application("dubbo-consumer-demo")
                 // Zookeeper
+                .protocol(builder -> builder.port(20887).name("dubbo"))
                 .registry("zookeeper", builder -> builder.address("etcd3://127.0.0.1:2379?registry.type=service&subscribed.services=dubbo-provider-demo"))
                 .metadataReport(new MetadataReportConfig("etcd://127.0.0.1:2379"))
                 // Nacos
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 35c808f..ea55a65 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -428,6 +428,11 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>io.etcd</groupId>
+                <artifactId>jetcd-launcher</artifactId>
+                <version>${jetcd_version}</version>
+            </dependency>
             <!-- Log libs -->
             <dependency>
                 <groupId>org.slf4j</groupId>
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
index 60f47ac..804f787 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -1 +1 @@
-etcd=org.apache.dubbo.registry.etcd.EtcdServiceDiscovery
\ No newline at end of file
+etcd3=org.apache.dubbo.registry.etcd.EtcdServiceDiscovery
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
index 285854b..266e3f6 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-etcd3/pom.xml
@@ -32,6 +32,7 @@
     <description>The etcd3 remoting module of Dubbo project</description>
     <properties>
         <skip_maven_deploy>false</skip_maven_deploy>
+        <assertj.version>3.13.2</assertj.version>
     </properties>
     <dependencies>
         <dependency>
@@ -48,10 +49,24 @@
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/io.etcd/jetcd-launcher -->
+        <dependency>
+            <groupId>io.etcd</groupId>
+            <artifactId>jetcd-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index d4512d4..784b5a0 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -205,7 +205,7 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
 
     @Override
     public boolean putEphemeral(String key, String value) {
-        return clientWrapper.put(key, value);
+        return clientWrapper.putEphemeral(key, value);
     }
 
     public ManagedChannel getChannel() {
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
new file mode 100644
index 0000000..2e0d5af
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
@@ -0,0 +1,122 @@
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import com.google.common.base.Charsets;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Observers;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.options.PutOption;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author cvictory ON 2019-08-16
+ */
+public class LeaseTest {
+
+    private static EtcdCluster cluster;
+
+    private KV kvClient;
+    private Client client;
+    private Lease leaseClient;
+
+    private static final ByteSequence KEY = ByteSequence.from("foo", Charsets.UTF_8);
+    private static final ByteSequence KEY_2 = ByteSequence.from("foo2", Charsets.UTF_8);
+    private static final ByteSequence VALUE = ByteSequence.from("bar", Charsets.UTF_8);
+
+    @BeforeAll
+    public static void beforeClass() {
+        cluster = EtcdClusterFactory.buildCluster("etcd-lease", 3, false);
+        cluster.start();
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        cluster.close();
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = Client.builder().endpoints(cluster.getClientEndpoints()).build();
+        kvClient = client.getKVClient();
+        leaseClient = client.getLeaseClient();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (client != null) {
+            client.close();
+        }
+
+    }
+
+    @Test
+    public void testGrant() throws Exception {
+        long leaseID = leaseClient.grant(5).get().getID();
+
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+        Thread.sleep(6000);
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testRevoke() throws Exception {
+        long leaseID = leaseClient.grant(5).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+        leaseClient.revoke(leaseID).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testKeepAliveOnce() throws ExecutionException, InterruptedException {
+        long leaseID = leaseClient.grant(2).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+        LeaseKeepAliveResponse rp = leaseClient.keepAliveOnce(leaseID).get();
+        assertThat(rp.getTTL()).isGreaterThan(0);
+    }
+
+    @Test
+    public void testKeepAlive() throws ExecutionException, InterruptedException {
+        long leaseID = leaseClient.grant(2).get().getID();
+        kvClient.put(KEY, VALUE, PutOption.newBuilder().withLeaseId(leaseID).build()).get();
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(1);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<LeaseKeepAliveResponse> responseRef = new AtomicReference<>();
+        StreamObserver<LeaseKeepAliveResponse> observer = Observers.observer(response -> {
+            responseRef.set(response);
+            latch.countDown();
+        });
+
+        try (CloseableClient c = leaseClient.keepAlive(leaseID, observer)) {
+            latch.await(5, TimeUnit.SECONDS);
+            LeaseKeepAliveResponse response = responseRef.get();
+            assertThat(response.getTTL()).isGreaterThan(0);
+        }
+
+        Thread.sleep(3000);
+        assertThat(kvClient.get(KEY).get().getCount()).isEqualTo(0);
+    }
+
+}