You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/04/09 09:39:32 UTC

[servicecomb-java-chassis] branch master updated: [SCB-2254]common service center client support websocket (#2350)

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 31bc3f1  [SCB-2254]common service center client support websocket (#2350)
31bc3f1 is described below

commit 31bc3f11ecc44629e5a4eb534ce0e045411ad5fc
Author: bao liu <bi...@qq.com>
AuthorDate: Fri Apr 9 17:38:15 2021 +0800

    [SCB-2254]common service center client support websocket (#2350)
---
 clients/http-client-common/pom.xml                 |   5 +
 .../http/client/common/WebSocketListener.java}     |  36 +---
 .../http/client/common/WebSocketTransport.java     |  69 ++++++++
 .../servicecomb/http/client/task/AbstractTask.java |   4 +-
 .../service/center/client/AddressManager.java      |  13 +-
 .../service/center/client/DiscoveryEvents.java     |   7 +
 .../service/center/client/ServiceCenterClient.java |   2 +-
 .../center/client/ServiceCenterDiscovery.java      | 117 +++++++------
 .../center/client/ServiceCenterRawClient.java      |   8 +-
 .../center/client/ServiceCenterRegistration.java   |   7 +-
 .../service/center/client/ServiceCenterWatch.java  | 187 +++++++++++++++++++++
 .../center/client/model/MicroserviceInstance.java  |  18 ++
 .../service/center/client/model/SchemaInfo.java    |  10 ++
 .../RegistryClientTest.java                        |   5 +-
 dependencies/default/pom.xml                       |   7 +
 15 files changed, 400 insertions(+), 95 deletions(-)

diff --git a/clients/http-client-common/pom.xml b/clients/http-client-common/pom.xml
index a4fcc10..e813110 100644
--- a/clients/http-client-common/pom.xml
+++ b/clients/http-client-common/pom.xml
@@ -68,6 +68,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.java-websocket</groupId>
+      <artifactId>Java-WebSocket</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
old mode 100755
new mode 100644
similarity index 59%
copy from clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
copy to clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
index 5ed5bb4..de79c2f
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
@@ -15,36 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.service.center.client.model;
+package org.apache.servicecomb.http.client.common;
 
-public class SchemaInfo {
-  private String schema;
+import org.java_websocket.handshake.ServerHandshake;
 
-  private String schemaId;
+public interface WebSocketListener {
+  void onMessage(String s);
 
-  private String summary;
+  void onError(Exception e);
 
-  public String getSchema() {
-    return schema;
-  }
+  void onClose(int code, String reason, boolean remote);
 
-  public void setSchema(String schema) {
-    this.schema = schema;
-  }
-
-  public String getSchemaId() {
-    return schemaId;
-  }
-
-  public void setSchemaId(String schemaId) {
-    this.schemaId = schemaId;
-  }
-
-  public String getSummary() {
-    return summary;
-  }
-
-  public void setSummary(String summary) {
-    this.summary = summary;
-  }
-}
\ No newline at end of file
+  void onOpen(ServerHandshake serverHandshake);
+}
diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java
new file mode 100644
index 0000000..c24d963
--- /dev/null
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.http.client.common;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.servicecomb.foundation.ssl.SSLManager;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.drafts.Draft_6455;
+import org.java_websocket.handshake.ServerHandshake;
+
+public class WebSocketTransport extends WebSocketClient {
+  public static final int CONNECT_TIMEOUT = 5000;
+
+  private WebSocketListener webSocketListener;
+
+  public WebSocketTransport(String serverUri, HttpConfiguration.SSLProperties sslProperties,
+      Map<String, String> headers, WebSocketListener webSocketListener)
+      throws URISyntaxException {
+    super(new URI(serverUri), new Draft_6455(), headers, CONNECT_TIMEOUT);
+
+    if (sslProperties.isEnabled()) {
+      SSLSocketFactory sslSocketFactory = SSLManager
+          .createSSLSocketFactory(sslProperties.getSslOption(), sslProperties.getSslCustom());
+      setSocketFactory(sslSocketFactory);
+    }
+
+    this.webSocketListener = webSocketListener;
+  }
+
+  @Override
+  public void onOpen(ServerHandshake serverHandshake) {
+    this.webSocketListener.onOpen(serverHandshake);
+  }
+
+  @Override
+  public void onMessage(String s) {
+    this.webSocketListener.onMessage(s);
+  }
+
+  @Override
+  public void onClose(int code, String reason, boolean remote) {
+    this.webSocketListener.onClose(code, reason, remote);
+  }
+
+  @Override
+  public void onError(Exception e) {
+    this.webSocketListener.onError(e);
+  }
+}
diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
index 097ac45..e05eef2 100644
--- a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
@@ -28,14 +28,14 @@ public class AbstractTask {
   public class BackOffSleepTask implements Task {
     final long base = 3000;
 
-    final long max = 60000;
+    final long max = 10 * 60 * 10000;
 
     long waitTime;
 
     Task nextTask;
 
     public BackOffSleepTask(int failedCount, Task nextTask) {
-      this.waitTime = failedCount * base;
+      this.waitTime = failedCount * failedCount * base;
       this.nextTask = nextTask;
     }
 
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
index 11966c9..107af8d 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
@@ -45,33 +45,26 @@ public class AddressManager {
     }
   }
 
-  public String nextAddress() {
+  public void changeAddress() {
     synchronized (this) {
       this.index++;
       if (this.index >= addresses.size()) {
         this.index = 0;
       }
     }
-    return address();
   }
 
   public boolean sslEnabled() {
     return address().startsWith("https://");
   }
 
-  private String address() {
-    synchronized (this) {
-      return formatAddress(addresses.get(index));
-    }
-  }
-
-  private String nonFormattedAddress() {
+  public String address() {
     synchronized (this) {
       return addresses.get(index);
     }
   }
 
   public String formatUrl(String url, boolean absoluteUrl) {
-    return absoluteUrl ? nonFormattedAddress() + url : address() + url;
+    return absoluteUrl ? address() + url : formatAddress(address()) + url;
   }
 }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
index e8f9149..eac6880 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
@@ -47,4 +47,11 @@ public abstract class DiscoveryEvents {
       return instances;
     }
   }
+
+  /**
+   * internal events to ask for a immediate instance pull
+   */
+  public static class PullInstanceEvent extends DiscoveryEvents {
+
+  }
 }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
index 1b2cd37..8db2565 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
@@ -295,7 +295,7 @@ public class ServiceCenterClient implements ServiceCenterOperation {
       HttpResponse response = httpClient
           .deleteHttpRequest("/registry/microservices/" + serviceId + "/instances/" + instanceId, null, null);
       if (response.getStatusCode() == HttpStatus.SC_OK) {
-        LOGGER.info("DELETE SERVICE INSTANCE OK");
+        LOGGER.info("Delete service instance successfully.");
       } else {
         throw new OperationException(
             "delete service instance fails, statusCode = " + response.getStatusCode() + "; message = " + response
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
index b6d218a..1b63035 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
@@ -25,13 +25,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.servicecomb.http.client.task.AbstractTask;
 import org.apache.servicecomb.http.client.task.Task;
 import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent;
+import org.apache.servicecomb.service.center.client.DiscoveryEvents.PullInstanceEvent;
 import org.apache.servicecomb.service.center.client.model.FindMicroserviceInstancesResponse;
-import org.apache.servicecomb.service.center.client.model.Microservice;
 import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 
 public class ServiceCenterDiscovery extends AbstractTask {
   private static final String ALL_VERSION = "0+";
@@ -40,12 +41,12 @@ public class ServiceCenterDiscovery extends AbstractTask {
 
   private boolean started = false;
 
-  class SubscriptionKey {
+  public static class SubscriptionKey {
     final String appId;
 
     final String serviceName;
 
-    SubscriptionKey(String appId, String serviceName) {
+    public SubscriptionKey(String appId, String serviceName) {
       this.appId = appId;
       this.serviceName = serviceName;
     }
@@ -69,7 +70,7 @@ public class ServiceCenterDiscovery extends AbstractTask {
     }
   }
 
-  class SubscriptionValue {
+  public static class SubscriptionValue {
     String revision;
 
     List<MicroserviceInstance> instancesCache;
@@ -81,7 +82,7 @@ public class ServiceCenterDiscovery extends AbstractTask {
 
   private final EventBus eventBus;
 
-  private Microservice myself;
+  private String myselfServiceId;
 
   private final Map<SubscriptionKey, SubscriptionValue> instancesCache = new ConcurrentHashMap<>();
 
@@ -89,10 +90,11 @@ public class ServiceCenterDiscovery extends AbstractTask {
     super("service-center-discovery-task");
     this.serviceCenterClient = serviceCenterClient;
     this.eventBus = eventBus;
+    this.eventBus.register(this);
   }
 
-  public void updateMySelf(Microservice myself) {
-    this.myself = myself;
+  public void updateMyselfServiceId(String myselfServiceId) {
+    this.myselfServiceId = myselfServiceId;
   }
 
   public void startDiscovery() {
@@ -102,58 +104,79 @@ public class ServiceCenterDiscovery extends AbstractTask {
     }
   }
 
-  public void register(Microservice microservice) {
-    SubscriptionKey subscriptionKey = new SubscriptionKey(microservice.getAppId(), microservice.getServiceName());
+  public void register(SubscriptionKey subscriptionKey) {
     this.instancesCache.computeIfAbsent(subscriptionKey, (key) -> new SubscriptionValue());
+    pullInstance(subscriptionKey, this.instancesCache.get(subscriptionKey));
+  }
+
+  public List<MicroserviceInstance> getInstanceCache(SubscriptionKey key) {
+    return this.instancesCache.get(key).instancesCache;
+  }
+
+  public boolean isRegistered(SubscriptionKey key) {
+    return this.instancesCache.get(key) != null;
+  }
+
+  @Subscribe
+  public void onPullInstanceEvent(PullInstanceEvent event) {
+    pullAllInstance();
+  }
+
+  private void pullInstance(SubscriptionKey k, SubscriptionValue v) {
+    try {
+      FindMicroserviceInstancesResponse instancesResponse = serviceCenterClient
+          .findMicroserviceInstance(myselfServiceId, k.appId, k.serviceName, ALL_VERSION, v.revision);
+      if (instancesResponse.isModified()) {
+        // java chassis 实现了空实例保护,这里暂时不实现。
+        LOGGER.info("Instance changed event, "
+                + "current: revision={}, instances={}; "
+                + "origin: revision={}, instances={}; "
+                + "appId={}, serviceName={}",
+            instancesResponse.getRevision(),
+            instanceToString(instancesResponse.getMicroserviceInstancesResponse().getInstances()),
+            v.revision,
+            instanceToString(v.instancesCache),
+            k.appId,
+            k.serviceName
+        );
+        v.instancesCache = instancesResponse.getMicroserviceInstancesResponse().getInstances();
+        v.revision = instancesResponse.getRevision();
+        eventBus.post(new InstanceChangedEvent(k.appId, k.serviceName,
+            v.instancesCache));
+      }
+    } catch (Exception e) {
+      LOGGER.error("find service instance failed.", e);
+    }
   }
 
   class PullInstanceTask implements Task {
     @Override
     public void execute() {
-      instancesCache.forEach((k, v) -> {
-        try {
-          FindMicroserviceInstancesResponse instancesResponse = serviceCenterClient
-              .findMicroserviceInstance(myself.getServiceId(), k.appId, k.serviceName, ALL_VERSION, v.revision);
-          if (instancesResponse.isModified()) {
-            // java chassis 实现了空实例保护,这里暂时不实现。
-            LOGGER.info("Instance changed event, "
-                    + "current: revision={}, instances={}; "
-                    + "origin: revision={}, instances={}; "
-                    + "appId={}, serviceName={}",
-                instancesResponse.getRevision(),
-                instanceToString(instancesResponse.getMicroserviceInstancesResponse().getInstances()),
-                v.revision,
-                instanceToString(v.instancesCache),
-                k.appId,
-                k.serviceName
-            );
-            v.instancesCache = instancesResponse.getMicroserviceInstancesResponse().getInstances();
-            v.revision = instancesResponse.getRevision();
-            eventBus.post(new InstanceChangedEvent(k.appId, k.serviceName,
-                v.instancesCache));
-          }
-        } catch (Exception e) {
-          LOGGER.error("find service instance failed.", e);
-        }
-      });
+      pullAllInstance();
 
       startTask(new BackOffSleepTask(POLL_INTERVAL, new PullInstanceTask()));
     }
+  }
 
-    private String instanceToString(List<MicroserviceInstance> instances) {
-      if (instances == null) {
-        return "";
-      }
+  private synchronized void pullAllInstance() {
+    instancesCache.forEach((k, v) -> {
+      pullInstance(k, v);
+    });
+  }
+
+  private static String instanceToString(List<MicroserviceInstance> instances) {
+    if (instances == null) {
+      return "";
+    }
 
-      StringBuilder sb = new StringBuilder();
-      for (MicroserviceInstance instance : instances) {
-        for (String endpoint : instance.getEndpoints()) {
-          sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
-          sb.append("|");
-        }
+    StringBuilder sb = new StringBuilder();
+    for (MicroserviceInstance instance : instances) {
+      for (String endpoint : instance.getEndpoints()) {
+        sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
+        sb.append("|");
       }
-      sb.append("#");
-      return sb.toString();
     }
+    sb.append("#");
+    return sb.toString();
   }
 }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
index 91e39ac..303dfda 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
@@ -80,16 +80,16 @@ public class ServiceCenterRawClient {
     try {
       return httpTransport.doRequest(httpRequest);
     } catch (IOException e) {
-      addressManager.nextAddress();
+      addressManager.changeAddress();
       String retryAddress = addressManager.formatUrl(url, absoluteUrl);
       LOGGER.warn("send request to {} failed and retry to {} once. ", address,
           retryAddress, e);
-      httpRequest = new HttpRequest(address, headers, content, method);
+      httpRequest = new HttpRequest(retryAddress, headers, content, method);
       try {
         return httpTransport.doRequest(httpRequest);
       } catch (IOException ioException) {
-        LOGGER.warn("retry to {} failed again, and change next address {}. ", retryAddress
-            , addressManager.nextAddress());
+        addressManager.changeAddress();
+        LOGGER.warn("retry to {} failed again. ", retryAddress, e);
         throw ioException;
       }
     }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
index dbad8f0..a9d8fbf 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
@@ -92,6 +92,7 @@ public class ServiceCenterRegistration extends AbstractTask {
           } else {
             microservice.setServiceId(response.getServiceId());
             microserviceInstance.setServiceId(response.getServiceId());
+            microserviceInstance.setMicroservice(microservice);
             eventBus.post(new MicroserviceRegistrationEvent(true));
             startTask(new RegisterSchemaTask(0));
           }
@@ -104,8 +105,10 @@ public class ServiceCenterRegistration extends AbstractTask {
           }
           microservice.setServiceId(serviceResponse.getServiceId());
           microserviceInstance.setServiceId(serviceResponse.getServiceId());
+          microserviceInstance.setMicroservice(microservice);
           eventBus.post(new MicroserviceRegistrationEvent(true));
-          startTask(new RegisterSchemaTask(0));
+          LOGGER.info("microservice is already registered, meta info like swagger contents will not be updated.");
+          startTask(new RegisterMicroserviceInstanceTask(0));
         }
       } catch (IllegalStateException e) {
         throw e;
@@ -180,6 +183,8 @@ public class ServiceCenterRegistration extends AbstractTask {
           startTask(new BackOffSleepTask(failedCount + 1, new RegisterMicroserviceInstanceTask(failedCount + 1)));
         } else {
           microserviceInstance.setInstanceId(instance.getInstanceId());
+          LOGGER.info("register microservice successfully, service id={}, instance id={}", microservice.getServiceId(),
+              microserviceInstance.getInstanceId());
           eventBus.post(new MicroserviceInstanceRegistrationEvent(true));
           startTask(new SendHeartBeatTask(0));
         }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java
new file mode 100644
index 0000000..e09af10
--- /dev/null
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.service.center.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.servicecomb.http.client.auth.RequestAuthHeaderProvider;
+import org.apache.servicecomb.http.client.common.HttpConfiguration.SSLProperties;
+import org.apache.servicecomb.http.client.common.WebSocketListener;
+import org.apache.servicecomb.http.client.common.WebSocketTransport;
+import org.apache.servicecomb.service.center.client.DiscoveryEvents.PullInstanceEvent;
+import org.java_websocket.handshake.ServerHandshake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class ServiceCenterWatch implements WebSocketListener {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceCenterWatch.class);
+
+  private static final String HTTP = "http://";
+
+  private static final String HTTPS = "https://";
+
+  private static final String WS = "ws://";
+
+  private static final String WSS = "wss://";
+
+  private static final String WATCH = "/v4/%s/registry/microservices/%s/watcher";
+
+  private static final long SLEEP_BASE = 3000;
+
+  private static final long SLEEP_MAX = 10 * 60 * 10000;
+
+  private AddressManager addressManager;
+
+  private SSLProperties sslProperties;
+
+  private RequestAuthHeaderProvider requestAuthHeaderProvider;
+
+  private String tenantName;
+
+  private Map<String, String> extraGlobalHeaders;
+
+  private WebSocketTransport webSocketTransport;
+
+  private String project;
+
+  private String serviceId;
+
+  private int continuousError = 0;
+
+  private AtomicBoolean reconnecting = new AtomicBoolean(false);
+
+  private EventBus eventBus;
+
+  private ExecutorService connector = Executors.newFixedThreadPool(1, (r) -> new
+      Thread(r, "web-socket-connector"));
+
+  public ServiceCenterWatch(AddressManager addressManager,
+      SSLProperties sslProperties,
+      RequestAuthHeaderProvider requestAuthHeaderProvider,
+      String tenantName,
+      Map<String, String> extraGlobalHeaders,
+      EventBus eventBus) {
+    this.addressManager = addressManager;
+    this.sslProperties = sslProperties;
+    this.requestAuthHeaderProvider = requestAuthHeaderProvider;
+    this.tenantName = tenantName;
+    this.extraGlobalHeaders = extraGlobalHeaders;
+    this.eventBus = eventBus;
+  }
+
+  public void startWatch(String project, String serviceId) {
+    this.project = project;
+    this.serviceId = serviceId;
+
+    startWatch();
+  }
+
+  private void startWatch() {
+    connector.submit(() -> {
+      backOff();
+
+      try {
+        Map<String, String> headers = new HashMap<>();
+        headers.put("x-domain-name", this.tenantName);
+        headers.putAll(this.extraGlobalHeaders);
+        headers.putAll(this.requestAuthHeaderProvider.loadAuthHeader(null));
+        LOGGER.info("start watch to address {}", addressManager.address());
+        webSocketTransport = new WebSocketTransport(convertAddress(), sslProperties,
+            headers, this);
+        webSocketTransport.connectBlocking();
+      } catch (Exception e) {
+        LOGGER.error("start watch failed. ", e);
+      }
+    });
+  }
+
+  private String convertAddress() {
+    String address = addressManager.address();
+    String url = String.format(WATCH, project, serviceId);
+    if (address.startsWith(HTTP)) {
+      return WS + address.substring(HTTP.length()) + url;
+    }
+
+    if (address.startsWith(HTTPS)) {
+      return WSS + address.substring(HTTPS.length()) + url;
+    }
+
+    return address + url;
+  }
+
+  public void stop() {
+    if (webSocketTransport != null) {
+      webSocketTransport.close();
+    }
+  }
+
+  private void reconnect() {
+    if (reconnecting.getAndSet(true)) {
+      return;
+    }
+    continuousError++;
+    if (webSocketTransport != null) {
+      webSocketTransport.close();
+    }
+    addressManager.changeAddress();
+    startWatch();
+  }
+
+  private void backOff() {
+    if (this.continuousError <= 0) {
+      return;
+    }
+    try {
+      Thread.sleep(Math.min(SLEEP_MAX, this.continuousError * this.continuousError * SLEEP_BASE));
+    } catch (InterruptedException e) {
+      // do not care
+    }
+  }
+
+  @Override
+  public void onMessage(String s) {
+    LOGGER.info("web socket receive message [{}], start query instance", s);
+    this.eventBus.post(new PullInstanceEvent());
+  }
+
+  @Override
+  public void onError(Exception e) {
+    LOGGER.warn("web socket receive error [{}], will restart.", e.getMessage());
+    reconnect();
+  }
+
+  @Override
+  public void onClose(int code, String reason, boolean remote) {
+    LOGGER.warn("web socket closed, code={}, reason={}.", code, reason);
+  }
+
+  @Override
+  public void onOpen(ServerHandshake serverHandshake) {
+    LOGGER.info("web socket connected to server {}, status={}, message={}", addressManager.address(),
+        serverHandshake.getHttpStatus(),
+        serverHandshake.getHttpStatusMessage());
+    continuousError = 0;
+    reconnecting.set(false);
+  }
+}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
index 3fd4c29..068501a 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonRootName;
 
 @JsonRootName("instance")
@@ -51,6 +52,9 @@ public class MicroserviceInstance {
 
   private String modTimestamp;
 
+  @JsonIgnore
+  private Microservice microservice;
+
   public String getInstanceId() {
     return instanceId;
   }
@@ -138,4 +142,18 @@ public class MicroserviceInstance {
   public void setDataCenterInfo(DataCenterInfo dataCenterInfo) {
     this.dataCenterInfo = dataCenterInfo;
   }
+
+  public void setMicroservice(Microservice microservice) {
+    this.microservice = microservice;
+  }
+
+  @JsonIgnore
+  public String getServiceName() {
+    return this.microservice.getServiceName();
+  }
+
+  @JsonIgnore
+  public String getApplicationName() {
+    return this.microservice.getAppId();
+  }
 }
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
index 5ed5bb4..720e0e9 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
@@ -24,6 +24,16 @@ public class SchemaInfo {
 
   private String summary;
 
+  public SchemaInfo() {
+
+  }
+
+  public SchemaInfo(String schemaId, String schema, String summary) {
+    this.schemaId = schemaId;
+    this.schema = schema;
+    this.summary = summary;
+  }
+
   public String getSchema() {
     return schema;
   }
diff --git a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
index 85a6296..87ec262 100644
--- a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
+++ b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
@@ -37,6 +37,7 @@ import org.apache.servicecomb.service.center.client.RegistrationEvents.Microserv
 import org.apache.servicecomb.service.center.client.RegistrationEvents.SchemaRegistrationEvent;
 import org.apache.servicecomb.service.center.client.ServiceCenterClient;
 import org.apache.servicecomb.service.center.client.ServiceCenterDiscovery;
+import org.apache.servicecomb.service.center.client.ServiceCenterDiscovery.SubscriptionKey;
 import org.apache.servicecomb.service.center.client.ServiceCenterRegistration;
 import org.apache.servicecomb.service.center.client.model.Microservice;
 import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
@@ -127,9 +128,9 @@ public class RegistryClientTest implements CategorizedTestCase {
     }
 
     ServiceCenterDiscovery discovery = new ServiceCenterDiscovery(serviceCenterClient, eventBus);
-    discovery.updateMySelf(microservice);
+    discovery.updateMyselfServiceId(microservice.getServiceId());
     discovery.startDiscovery();
-    discovery.register(microservice);
+    discovery.register(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName()));
     discoveryCounter.await(30000, TimeUnit.MILLISECONDS);
     TestMgr.check(instances != null, true);
     TestMgr.check(instances.size(), 1);
diff --git a/dependencies/default/pom.xml b/dependencies/default/pom.xml
index e2fc458..708bb6b 100644
--- a/dependencies/default/pom.xml
+++ b/dependencies/default/pom.xml
@@ -112,6 +112,7 @@
     <zookeeper.version>3.4.14</zookeeper.version>
     <nacos-client.version>1.1.4</nacos-client.version>
     <resilience4j.versioin>1.5.0</resilience4j.versioin>
+    <java-websocket.version>1.5.1</java-websocket.version>
     <!-- Base dir of main -->
     <main.basedir>${basedir}/../..</main.basedir>
   </properties>
@@ -874,6 +875,12 @@
       </dependency>
 
       <dependency>
+        <groupId>org.java-websocket</groupId>
+        <artifactId>Java-WebSocket</artifactId>
+        <version>${java-websocket.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.servicecomb</groupId>
         <artifactId>java-chassis-bom</artifactId>
         <version>${project.version}</version>