You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by vi...@apache.org on 2019/07/26 02:26:18 UTC
[dubbo] 04/05: temporily commit
This is an automated email from the ASF dual-hosted git repository.
victory pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 1aae6956dcca2b9d4982d8ec107dbf4ae2481fda
Author: cvictory <sh...@gmail.com>
AuthorDate: Mon Jul 22 16:08:33 2019 +0800
temporily commit
---
.../bootstrap/DubboServiceProviderBootstrap.java | 4 ++--
.../dubbo/registry/etcd/EtcdServiceDiscovery.java | 19 +++++++++++++++++--
.../org/apache/dubbo/remoting/etcd/EtcdClient.java | 8 ++++++++
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 5 +++++
.../remoting/etcd/jetcd/JEtcdClientWrapper.java | 20 ++++++++++++++++++++
5 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
index c68fbd0..4e46a67 100644
--- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
+++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java
@@ -38,8 +38,8 @@ public class DubboServiceProviderBootstrap {
// .metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build())
.metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build())
// .application(ApplicationBuilder.newBuilder().name("dubbo-provider-demo").build())
-// .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build())
- .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build())
+ .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build())
+// .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build())
.protocol(ProtocolBuilder.newBuilder().port(-1).name("dubbo").build())
.service(ServiceBuilder.newBuilder().id("test").interfaceClass(EchoService.class).ref(new EchoServiceImpl()).build())
.start()
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
index 9fcc5d4..4cce645 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -38,6 +38,8 @@ import com.google.gson.Gson;
import java.io.File;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -62,6 +64,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
private final EtcdClient etcdClient;
private final EventDispatcher dispatcher;
+ private ServiceInstance serviceInstance;
public EtcdServiceDiscovery(URL url, EtcdTransporter etcdTransporter) {
if (url.isAnyHost()) {
@@ -72,7 +75,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
etcdClient.addStateListener(state -> {
if (state == StateListener.CONNECTED) {
try {
-// recover();
+ recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
@@ -101,8 +104,9 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
@Override
public void register(ServiceInstance serviceInstance) throws RuntimeException {
try {
+ this.serviceInstance = serviceInstance;
String path = toPath(serviceInstance);
- etcdClient.put(path, new Gson().toJson(serviceInstance));
+ etcdClient.putEphemeral(path, new Gson().toJson(serviceInstance));
services.add(serviceInstance.getServiceName());
} catch (Throwable e) {
throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl()
@@ -137,6 +141,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
String path = toPath(serviceInstance);
etcdClient.delete(path);
services.remove(serviceInstance.getServiceName());
+ this.serviceInstance = null;
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e);
}
@@ -175,4 +180,14 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
*/
List<String> children = etcdClient.addChildListener(path, childListener);
}
+
+ private void recover() throws Exception {
+ // register
+ if (serviceInstance != null) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Recover application register: " + serviceInstance);
+ }
+ register(serviceInstance);
+ }
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
index 286be93..e23b870 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -180,4 +180,12 @@ public interface EtcdClient {
*/
boolean put(String key, String value);
+ /**
+ * Put the key value pair to etcd (Ephemeral)
+ * @param key the specified key
+ * @param value the paired value
+ * @return true if put success
+ */
+ boolean putEphemeral(String key, String value);
+
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index 4c055b4..01a6025 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -203,6 +203,11 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
return clientWrapper.put(key, value);
}
+ @Override
+ public boolean putEphemeral(String key, String value) {
+ return clientWrapper.put(key, value);
+ }
+
public ManagedChannel getChannel() {
return clientWrapper.getChannel();
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index 7ef6777..f635005 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -667,6 +667,26 @@ public class JEtcdClientWrapper {
return false;
}
+ public boolean putEphemeral(final String key, String value) {
+ try {
+ return RetryLoops.invokeWithRetry(
+ () -> {
+ requiredNotNull(client, failed);
+ // recovery an retry
+ keepAlive();
+ final long leaseId = globalLeaseId;
+ client.getKVClient()
+ .put(ByteSequence.from(key, UTF_8)
+ , ByteSequence.from(String.valueOf(value), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return true;
+ }, retryPolicy);
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
private void retry() {
if (!failedRegistered.isEmpty()) {
Set<String> failed = new HashSet<String>(failedRegistered);