You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/07/26 02:26:18 UTC

[dubbo] 04/05: temporily commit

This is an automated email from the ASF dual-hosted git repository.

victory pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 1aae6956dcca2b9d4982d8ec107dbf4ae2481fda
Author: cvictory <sh...@gmail.com>
AuthorDate: Mon Jul 22 16:08:33 2019 +0800

    temporily commit
---
 .../bootstrap/DubboServiceProviderBootstrap.java     |  4 ++--
 .../dubbo/registry/etcd/EtcdServiceDiscovery.java    | 19 +++++++++++++++++--
 .../org/apache/dubbo/remoting/etcd/EtcdClient.java   |  8 ++++++++
 .../dubbo/remoting/etcd/jetcd/JEtcdClient.java       |  5 +++++
 .../remoting/etcd/jetcd/JEtcdClientWrapper.java      | 20 ++++++++++++++++++++
 5 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
index c68fbd0..4e46a67 100644
--- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
+++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
@@ -38,8 +38,8 @@ public class DubboServiceProviderBootstrap {
 //                .metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build())
                 .metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build())
 //                .application(ApplicationBuilder.newBuilder().name("dubbo-provider-demo").build())
-//                .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build())
-                .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build())
+                .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build())
+//                .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build())
                 .protocol(ProtocolBuilder.newBuilder().port(-1).name("dubbo").build())
                 .service(ServiceBuilder.newBuilder().id("test").interfaceClass(EchoService.class).ref(new EchoServiceImpl()).build())
                 .start()
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
index 9fcc5d4..4cce645 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -38,6 +38,8 @@ import com.google.gson.Gson;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -62,6 +64,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
     private final EtcdClient etcdClient;
     private final EventDispatcher dispatcher;
+    private ServiceInstance serviceInstance;
 
     public EtcdServiceDiscovery(URL url, EtcdTransporter etcdTransporter) {
         if (url.isAnyHost()) {
@@ -72,7 +75,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
         etcdClient.addStateListener(state -> {
             if (state == StateListener.CONNECTED) {
                 try {
-//                    recover();
+                    recover();
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
@@ -101,8 +104,9 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
     @Override
     public void register(ServiceInstance serviceInstance) throws RuntimeException {
         try {
+            this.serviceInstance = serviceInstance;
             String path = toPath(serviceInstance);
-            etcdClient.put(path, new Gson().toJson(serviceInstance));
+            etcdClient.putEphemeral(path, new Gson().toJson(serviceInstance));
             services.add(serviceInstance.getServiceName());
         } catch (Throwable e) {
             throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl()
@@ -137,6 +141,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
             String path = toPath(serviceInstance);
             etcdClient.delete(path);
             services.remove(serviceInstance.getServiceName());
+            this.serviceInstance = null;
         } catch (Throwable e) {
             throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e);
         }
@@ -175,4 +180,14 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
          */
         List<String> children = etcdClient.addChildListener(path, childListener);
     }
+
+    private void recover() throws Exception {
+        // register
+        if (serviceInstance != null) {
+            if (logger.isInfoEnabled()) {
+                logger.info("Recover application register: " + serviceInstance);
+            }
+            register(serviceInstance);
+        }
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
index 286be93..e23b870 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -180,4 +180,12 @@ public interface EtcdClient {
      */
     boolean put(String key, String value);
 
+    /**
+     * Put the key value pair to etcd (Ephemeral)
+     * @param key the specified key
+     * @param value the paired value
+     * @return true if put success
+     */
+    boolean putEphemeral(String key, String value);
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index 4c055b4..01a6025 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -203,6 +203,11 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
         return clientWrapper.put(key, value);
     }
 
+    @Override
+    public boolean putEphemeral(String key, String value) {
+        return clientWrapper.put(key, value);
+    }
+
     public ManagedChannel getChannel() {
         return clientWrapper.getChannel();
     }
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index 7ef6777..f635005 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -667,6 +667,26 @@ public class JEtcdClientWrapper {
         return false;
     }
 
+    public boolean putEphemeral(final String key, String value) {
+        try {
+            return RetryLoops.invokeWithRetry(
+                    () -> {
+                        requiredNotNull(client, failed);
+                        // recovery an retry
+                        keepAlive();
+                        final long leaseId = globalLeaseId;
+                        client.getKVClient()
+                                .put(ByteSequence.from(key, UTF_8)
+                                        , ByteSequence.from(String.valueOf(value), UTF_8)
+                                        , PutOption.newBuilder().withLeaseId(leaseId).build())
+                                .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+                        return true;
+                    }, retryPolicy);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     private void retry() {
         if (!failedRegistered.isEmpty()) {
             Set<String> failed = new HashSet<String>(failedRegistered);