You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/04/09 09:39:32 UTC
[servicecomb-java-chassis] branch master updated: [SCB-2254]common
service center client support websocket (#2350)
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 31bc3f1 [SCB-2254]common service center client support websocket (#2350)
31bc3f1 is described below
commit 31bc3f11ecc44629e5a4eb534ce0e045411ad5fc
Author: bao liu <bi...@qq.com>
AuthorDate: Fri Apr 9 17:38:15 2021 +0800
[SCB-2254]common service center client support websocket (#2350)
---
clients/http-client-common/pom.xml | 5 +
.../http/client/common/WebSocketListener.java} | 36 +---
.../http/client/common/WebSocketTransport.java | 69 ++++++++
.../servicecomb/http/client/task/AbstractTask.java | 4 +-
.../service/center/client/AddressManager.java | 13 +-
.../service/center/client/DiscoveryEvents.java | 7 +
.../service/center/client/ServiceCenterClient.java | 2 +-
.../center/client/ServiceCenterDiscovery.java | 117 +++++++------
.../center/client/ServiceCenterRawClient.java | 8 +-
.../center/client/ServiceCenterRegistration.java | 7 +-
.../service/center/client/ServiceCenterWatch.java | 187 +++++++++++++++++++++
.../center/client/model/MicroserviceInstance.java | 18 ++
.../service/center/client/model/SchemaInfo.java | 10 ++
.../RegistryClientTest.java | 5 +-
dependencies/default/pom.xml | 7 +
15 files changed, 400 insertions(+), 95 deletions(-)
diff --git a/clients/http-client-common/pom.xml b/clients/http-client-common/pom.xml
index a4fcc10..e813110 100644
--- a/clients/http-client-common/pom.xml
+++ b/clients/http-client-common/pom.xml
@@ -68,6 +68,11 @@
</dependency>
<dependency>
+ <groupId>org.java-websocket</groupId>
+ <artifactId>Java-WebSocket</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
old mode 100755
new mode 100644
similarity index 59%
copy from clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
copy to clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
index 5ed5bb4..de79c2f
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketListener.java
@@ -15,36 +15,16 @@
* limitations under the License.
*/
-package org.apache.servicecomb.service.center.client.model;
+package org.apache.servicecomb.http.client.common;
-public class SchemaInfo {
- private String schema;
+import org.java_websocket.handshake.ServerHandshake;
- private String schemaId;
+public interface WebSocketListener {
+ void onMessage(String s);
- private String summary;
+ void onError(Exception e);
- public String getSchema() {
- return schema;
- }
+ void onClose(int code, String reason, boolean remote);
- public void setSchema(String schema) {
- this.schema = schema;
- }
-
- public String getSchemaId() {
- return schemaId;
- }
-
- public void setSchemaId(String schemaId) {
- this.schemaId = schemaId;
- }
-
- public String getSummary() {
- return summary;
- }
-
- public void setSummary(String summary) {
- this.summary = summary;
- }
-}
\ No newline at end of file
+ void onOpen(ServerHandshake serverHandshake);
+}
diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java
new file mode 100644
index 0000000..c24d963
--- /dev/null
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/WebSocketTransport.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.http.client.common;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.servicecomb.foundation.ssl.SSLManager;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.drafts.Draft_6455;
+import org.java_websocket.handshake.ServerHandshake;
+
+public class WebSocketTransport extends WebSocketClient {
+ public static final int CONNECT_TIMEOUT = 5000;
+
+ private WebSocketListener webSocketListener;
+
+ public WebSocketTransport(String serverUri, HttpConfiguration.SSLProperties sslProperties,
+ Map<String, String> headers, WebSocketListener webSocketListener)
+ throws URISyntaxException {
+ super(new URI(serverUri), new Draft_6455(), headers, CONNECT_TIMEOUT);
+
+ if (sslProperties.isEnabled()) {
+ SSLSocketFactory sslSocketFactory = SSLManager
+ .createSSLSocketFactory(sslProperties.getSslOption(), sslProperties.getSslCustom());
+ setSocketFactory(sslSocketFactory);
+ }
+
+ this.webSocketListener = webSocketListener;
+ }
+
+ @Override
+ public void onOpen(ServerHandshake serverHandshake) {
+ this.webSocketListener.onOpen(serverHandshake);
+ }
+
+ @Override
+ public void onMessage(String s) {
+ this.webSocketListener.onMessage(s);
+ }
+
+ @Override
+ public void onClose(int code, String reason, boolean remote) {
+ this.webSocketListener.onClose(code, reason, remote);
+ }
+
+ @Override
+ public void onError(Exception e) {
+ this.webSocketListener.onError(e);
+ }
+}
diff --git a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
index 097ac45..e05eef2 100644
--- a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
+++ b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/task/AbstractTask.java
@@ -28,14 +28,14 @@ public class AbstractTask {
public class BackOffSleepTask implements Task {
final long base = 3000;
- final long max = 60000;
+ final long max = 10 * 60 * 10000;
long waitTime;
Task nextTask;
public BackOffSleepTask(int failedCount, Task nextTask) {
- this.waitTime = failedCount * base;
+ this.waitTime = failedCount * failedCount * base;
this.nextTask = nextTask;
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
index 11966c9..107af8d 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/AddressManager.java
@@ -45,33 +45,26 @@ public class AddressManager {
}
}
- public String nextAddress() {
+ public void changeAddress() {
synchronized (this) {
this.index++;
if (this.index >= addresses.size()) {
this.index = 0;
}
}
- return address();
}
public boolean sslEnabled() {
return address().startsWith("https://");
}
- private String address() {
- synchronized (this) {
- return formatAddress(addresses.get(index));
- }
- }
-
- private String nonFormattedAddress() {
+ public String address() {
synchronized (this) {
return addresses.get(index);
}
}
public String formatUrl(String url, boolean absoluteUrl) {
- return absoluteUrl ? nonFormattedAddress() + url : address() + url;
+ return absoluteUrl ? address() + url : formatAddress(address()) + url;
}
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
index e8f9149..eac6880 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java
@@ -47,4 +47,11 @@ public abstract class DiscoveryEvents {
return instances;
}
}
+
+ /**
+ * internal events to ask for a immediate instance pull
+ */
+ public static class PullInstanceEvent extends DiscoveryEvents {
+
+ }
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
index 1b2cd37..8db2565 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java
@@ -295,7 +295,7 @@ public class ServiceCenterClient implements ServiceCenterOperation {
HttpResponse response = httpClient
.deleteHttpRequest("/registry/microservices/" + serviceId + "/instances/" + instanceId, null, null);
if (response.getStatusCode() == HttpStatus.SC_OK) {
- LOGGER.info("DELETE SERVICE INSTANCE OK");
+ LOGGER.info("Delete service instance successfully.");
} else {
throw new OperationException(
"delete service instance fails, statusCode = " + response.getStatusCode() + "; message = " + response
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
index b6d218a..1b63035 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java
@@ -25,13 +25,14 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.http.client.task.AbstractTask;
import org.apache.servicecomb.http.client.task.Task;
import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent;
+import org.apache.servicecomb.service.center.client.DiscoveryEvents.PullInstanceEvent;
import org.apache.servicecomb.service.center.client.model.FindMicroserviceInstancesResponse;
-import org.apache.servicecomb.service.center.client.model.Microservice;
import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
public class ServiceCenterDiscovery extends AbstractTask {
private static final String ALL_VERSION = "0+";
@@ -40,12 +41,12 @@ public class ServiceCenterDiscovery extends AbstractTask {
private boolean started = false;
- class SubscriptionKey {
+ public static class SubscriptionKey {
final String appId;
final String serviceName;
- SubscriptionKey(String appId, String serviceName) {
+ public SubscriptionKey(String appId, String serviceName) {
this.appId = appId;
this.serviceName = serviceName;
}
@@ -69,7 +70,7 @@ public class ServiceCenterDiscovery extends AbstractTask {
}
}
- class SubscriptionValue {
+ public static class SubscriptionValue {
String revision;
List<MicroserviceInstance> instancesCache;
@@ -81,7 +82,7 @@ public class ServiceCenterDiscovery extends AbstractTask {
private final EventBus eventBus;
- private Microservice myself;
+ private String myselfServiceId;
private final Map<SubscriptionKey, SubscriptionValue> instancesCache = new ConcurrentHashMap<>();
@@ -89,10 +90,11 @@ public class ServiceCenterDiscovery extends AbstractTask {
super("service-center-discovery-task");
this.serviceCenterClient = serviceCenterClient;
this.eventBus = eventBus;
+ this.eventBus.register(this);
}
- public void updateMySelf(Microservice myself) {
- this.myself = myself;
+ public void updateMyselfServiceId(String myselfServiceId) {
+ this.myselfServiceId = myselfServiceId;
}
public void startDiscovery() {
@@ -102,58 +104,79 @@ public class ServiceCenterDiscovery extends AbstractTask {
}
}
- public void register(Microservice microservice) {
- SubscriptionKey subscriptionKey = new SubscriptionKey(microservice.getAppId(), microservice.getServiceName());
+ public void register(SubscriptionKey subscriptionKey) {
this.instancesCache.computeIfAbsent(subscriptionKey, (key) -> new SubscriptionValue());
+ pullInstance(subscriptionKey, this.instancesCache.get(subscriptionKey));
+ }
+
+ public List<MicroserviceInstance> getInstanceCache(SubscriptionKey key) {
+ return this.instancesCache.get(key).instancesCache;
+ }
+
+ public boolean isRegistered(SubscriptionKey key) {
+ return this.instancesCache.get(key) != null;
+ }
+
+ @Subscribe
+ public void onPullInstanceEvent(PullInstanceEvent event) {
+ pullAllInstance();
+ }
+
+ private void pullInstance(SubscriptionKey k, SubscriptionValue v) {
+ try {
+ FindMicroserviceInstancesResponse instancesResponse = serviceCenterClient
+ .findMicroserviceInstance(myselfServiceId, k.appId, k.serviceName, ALL_VERSION, v.revision);
+ if (instancesResponse.isModified()) {
+ // java chassis 实现了空实例保护,这里暂时不实现。
+ LOGGER.info("Instance changed event, "
+ + "current: revision={}, instances={}; "
+ + "origin: revision={}, instances={}; "
+ + "appId={}, serviceName={}",
+ instancesResponse.getRevision(),
+ instanceToString(instancesResponse.getMicroserviceInstancesResponse().getInstances()),
+ v.revision,
+ instanceToString(v.instancesCache),
+ k.appId,
+ k.serviceName
+ );
+ v.instancesCache = instancesResponse.getMicroserviceInstancesResponse().getInstances();
+ v.revision = instancesResponse.getRevision();
+ eventBus.post(new InstanceChangedEvent(k.appId, k.serviceName,
+ v.instancesCache));
+ }
+ } catch (Exception e) {
+ LOGGER.error("find service instance failed.", e);
+ }
}
class PullInstanceTask implements Task {
@Override
public void execute() {
- instancesCache.forEach((k, v) -> {
- try {
- FindMicroserviceInstancesResponse instancesResponse = serviceCenterClient
- .findMicroserviceInstance(myself.getServiceId(), k.appId, k.serviceName, ALL_VERSION, v.revision);
- if (instancesResponse.isModified()) {
- // java chassis 实现了空实例保护,这里暂时不实现。
- LOGGER.info("Instance changed event, "
- + "current: revision={}, instances={}; "
- + "origin: revision={}, instances={}; "
- + "appId={}, serviceName={}",
- instancesResponse.getRevision(),
- instanceToString(instancesResponse.getMicroserviceInstancesResponse().getInstances()),
- v.revision,
- instanceToString(v.instancesCache),
- k.appId,
- k.serviceName
- );
- v.instancesCache = instancesResponse.getMicroserviceInstancesResponse().getInstances();
- v.revision = instancesResponse.getRevision();
- eventBus.post(new InstanceChangedEvent(k.appId, k.serviceName,
- v.instancesCache));
- }
- } catch (Exception e) {
- LOGGER.error("find service instance failed.", e);
- }
- });
+ pullAllInstance();
startTask(new BackOffSleepTask(POLL_INTERVAL, new PullInstanceTask()));
}
+ }
- private String instanceToString(List<MicroserviceInstance> instances) {
- if (instances == null) {
- return "";
- }
+ private synchronized void pullAllInstance() {
+ instancesCache.forEach((k, v) -> {
+ pullInstance(k, v);
+ });
+ }
+
+ private static String instanceToString(List<MicroserviceInstance> instances) {
+ if (instances == null) {
+ return "";
+ }
- StringBuilder sb = new StringBuilder();
- for (MicroserviceInstance instance : instances) {
- for (String endpoint : instance.getEndpoints()) {
- sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
- sb.append("|");
- }
+ StringBuilder sb = new StringBuilder();
+ for (MicroserviceInstance instance : instances) {
+ for (String endpoint : instance.getEndpoints()) {
+ sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
+ sb.append("|");
}
- sb.append("#");
- return sb.toString();
}
+ sb.append("#");
+ return sb.toString();
}
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
index 91e39ac..303dfda 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRawClient.java
@@ -80,16 +80,16 @@ public class ServiceCenterRawClient {
try {
return httpTransport.doRequest(httpRequest);
} catch (IOException e) {
- addressManager.nextAddress();
+ addressManager.changeAddress();
String retryAddress = addressManager.formatUrl(url, absoluteUrl);
LOGGER.warn("send request to {} failed and retry to {} once. ", address,
retryAddress, e);
- httpRequest = new HttpRequest(address, headers, content, method);
+ httpRequest = new HttpRequest(retryAddress, headers, content, method);
try {
return httpTransport.doRequest(httpRequest);
} catch (IOException ioException) {
- LOGGER.warn("retry to {} failed again, and change next address {}. ", retryAddress
- , addressManager.nextAddress());
+ addressManager.changeAddress();
+ LOGGER.warn("retry to {} failed again. ", retryAddress, e);
throw ioException;
}
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
index dbad8f0..a9d8fbf 100644
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterRegistration.java
@@ -92,6 +92,7 @@ public class ServiceCenterRegistration extends AbstractTask {
} else {
microservice.setServiceId(response.getServiceId());
microserviceInstance.setServiceId(response.getServiceId());
+ microserviceInstance.setMicroservice(microservice);
eventBus.post(new MicroserviceRegistrationEvent(true));
startTask(new RegisterSchemaTask(0));
}
@@ -104,8 +105,10 @@ public class ServiceCenterRegistration extends AbstractTask {
}
microservice.setServiceId(serviceResponse.getServiceId());
microserviceInstance.setServiceId(serviceResponse.getServiceId());
+ microserviceInstance.setMicroservice(microservice);
eventBus.post(new MicroserviceRegistrationEvent(true));
- startTask(new RegisterSchemaTask(0));
+ LOGGER.info("microservice is already registered, meta info like swagger contents will not be updated.");
+ startTask(new RegisterMicroserviceInstanceTask(0));
}
} catch (IllegalStateException e) {
throw e;
@@ -180,6 +183,8 @@ public class ServiceCenterRegistration extends AbstractTask {
startTask(new BackOffSleepTask(failedCount + 1, new RegisterMicroserviceInstanceTask(failedCount + 1)));
} else {
microserviceInstance.setInstanceId(instance.getInstanceId());
+ LOGGER.info("register microservice successfully, service id={}, instance id={}", microservice.getServiceId(),
+ microserviceInstance.getInstanceId());
eventBus.post(new MicroserviceInstanceRegistrationEvent(true));
startTask(new SendHeartBeatTask(0));
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java
new file mode 100644
index 0000000..e09af10
--- /dev/null
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.service.center.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.servicecomb.http.client.auth.RequestAuthHeaderProvider;
+import org.apache.servicecomb.http.client.common.HttpConfiguration.SSLProperties;
+import org.apache.servicecomb.http.client.common.WebSocketListener;
+import org.apache.servicecomb.http.client.common.WebSocketTransport;
+import org.apache.servicecomb.service.center.client.DiscoveryEvents.PullInstanceEvent;
+import org.java_websocket.handshake.ServerHandshake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class ServiceCenterWatch implements WebSocketListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServiceCenterWatch.class);
+
+ private static final String HTTP = "http://";
+
+ private static final String HTTPS = "https://";
+
+ private static final String WS = "ws://";
+
+ private static final String WSS = "wss://";
+
+ private static final String WATCH = "/v4/%s/registry/microservices/%s/watcher";
+
+ private static final long SLEEP_BASE = 3000;
+
+ private static final long SLEEP_MAX = 10 * 60 * 10000;
+
+ private AddressManager addressManager;
+
+ private SSLProperties sslProperties;
+
+ private RequestAuthHeaderProvider requestAuthHeaderProvider;
+
+ private String tenantName;
+
+ private Map<String, String> extraGlobalHeaders;
+
+ private WebSocketTransport webSocketTransport;
+
+ private String project;
+
+ private String serviceId;
+
+ private int continuousError = 0;
+
+ private AtomicBoolean reconnecting = new AtomicBoolean(false);
+
+ private EventBus eventBus;
+
+ private ExecutorService connector = Executors.newFixedThreadPool(1, (r) -> new
+ Thread(r, "web-socket-connector"));
+
+ public ServiceCenterWatch(AddressManager addressManager,
+ SSLProperties sslProperties,
+ RequestAuthHeaderProvider requestAuthHeaderProvider,
+ String tenantName,
+ Map<String, String> extraGlobalHeaders,
+ EventBus eventBus) {
+ this.addressManager = addressManager;
+ this.sslProperties = sslProperties;
+ this.requestAuthHeaderProvider = requestAuthHeaderProvider;
+ this.tenantName = tenantName;
+ this.extraGlobalHeaders = extraGlobalHeaders;
+ this.eventBus = eventBus;
+ }
+
+ public void startWatch(String project, String serviceId) {
+ this.project = project;
+ this.serviceId = serviceId;
+
+ startWatch();
+ }
+
+ private void startWatch() {
+ connector.submit(() -> {
+ backOff();
+
+ try {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("x-domain-name", this.tenantName);
+ headers.putAll(this.extraGlobalHeaders);
+ headers.putAll(this.requestAuthHeaderProvider.loadAuthHeader(null));
+ LOGGER.info("start watch to address {}", addressManager.address());
+ webSocketTransport = new WebSocketTransport(convertAddress(), sslProperties,
+ headers, this);
+ webSocketTransport.connectBlocking();
+ } catch (Exception e) {
+ LOGGER.error("start watch failed. ", e);
+ }
+ });
+ }
+
+ private String convertAddress() {
+ String address = addressManager.address();
+ String url = String.format(WATCH, project, serviceId);
+ if (address.startsWith(HTTP)) {
+ return WS + address.substring(HTTP.length()) + url;
+ }
+
+ if (address.startsWith(HTTPS)) {
+ return WSS + address.substring(HTTPS.length()) + url;
+ }
+
+ return address + url;
+ }
+
+ public void stop() {
+ if (webSocketTransport != null) {
+ webSocketTransport.close();
+ }
+ }
+
+ private void reconnect() {
+ if (reconnecting.getAndSet(true)) {
+ return;
+ }
+ continuousError++;
+ if (webSocketTransport != null) {
+ webSocketTransport.close();
+ }
+ addressManager.changeAddress();
+ startWatch();
+ }
+
+ private void backOff() {
+ if (this.continuousError <= 0) {
+ return;
+ }
+ try {
+ Thread.sleep(Math.min(SLEEP_MAX, this.continuousError * this.continuousError * SLEEP_BASE));
+ } catch (InterruptedException e) {
+ // do not care
+ }
+ }
+
+ @Override
+ public void onMessage(String s) {
+ LOGGER.info("web socket receive message [{}], start query instance", s);
+ this.eventBus.post(new PullInstanceEvent());
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.warn("web socket receive error [{}], will restart.", e.getMessage());
+ reconnect();
+ }
+
+ @Override
+ public void onClose(int code, String reason, boolean remote) {
+ LOGGER.warn("web socket closed, code={}, reason={}.", code, reason);
+ }
+
+ @Override
+ public void onOpen(ServerHandshake serverHandshake) {
+ LOGGER.info("web socket connected to server {}, status={}, message={}", addressManager.address(),
+ serverHandshake.getHttpStatus(),
+ serverHandshake.getHttpStatusMessage());
+ continuousError = 0;
+ reconnecting.set(false);
+ }
+}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
index 3fd4c29..068501a 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/MicroserviceInstance.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonRootName;
@JsonRootName("instance")
@@ -51,6 +52,9 @@ public class MicroserviceInstance {
private String modTimestamp;
+ @JsonIgnore
+ private Microservice microservice;
+
public String getInstanceId() {
return instanceId;
}
@@ -138,4 +142,18 @@ public class MicroserviceInstance {
public void setDataCenterInfo(DataCenterInfo dataCenterInfo) {
this.dataCenterInfo = dataCenterInfo;
}
+
+ public void setMicroservice(Microservice microservice) {
+ this.microservice = microservice;
+ }
+
+ @JsonIgnore
+ public String getServiceName() {
+ return this.microservice.getServiceName();
+ }
+
+ @JsonIgnore
+ public String getApplicationName() {
+ return this.microservice.getAppId();
+ }
}
diff --git a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
index 5ed5bb4..720e0e9 100755
--- a/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
+++ b/clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/model/SchemaInfo.java
@@ -24,6 +24,16 @@ public class SchemaInfo {
private String summary;
+ public SchemaInfo() {
+
+ }
+
+ public SchemaInfo(String schemaId, String schema, String summary) {
+ this.schemaId = schemaId;
+ this.schema = schema;
+ this.summary = summary;
+ }
+
public String getSchema() {
return schema;
}
diff --git a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
index 85a6296..87ec262 100644
--- a/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
+++ b/demo/demo-multi-service-center/demo-multi-service-center-client/src/main/java/org/apache/servicecomb/demo/multiServiceCenterClient/RegistryClientTest.java
@@ -37,6 +37,7 @@ import org.apache.servicecomb.service.center.client.RegistrationEvents.Microserv
import org.apache.servicecomb.service.center.client.RegistrationEvents.SchemaRegistrationEvent;
import org.apache.servicecomb.service.center.client.ServiceCenterClient;
import org.apache.servicecomb.service.center.client.ServiceCenterDiscovery;
+import org.apache.servicecomb.service.center.client.ServiceCenterDiscovery.SubscriptionKey;
import org.apache.servicecomb.service.center.client.ServiceCenterRegistration;
import org.apache.servicecomb.service.center.client.model.Microservice;
import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
@@ -127,9 +128,9 @@ public class RegistryClientTest implements CategorizedTestCase {
}
ServiceCenterDiscovery discovery = new ServiceCenterDiscovery(serviceCenterClient, eventBus);
- discovery.updateMySelf(microservice);
+ discovery.updateMyselfServiceId(microservice.getServiceId());
discovery.startDiscovery();
- discovery.register(microservice);
+ discovery.register(new SubscriptionKey(microservice.getAppId(), microservice.getServiceName()));
discoveryCounter.await(30000, TimeUnit.MILLISECONDS);
TestMgr.check(instances != null, true);
TestMgr.check(instances.size(), 1);
diff --git a/dependencies/default/pom.xml b/dependencies/default/pom.xml
index e2fc458..708bb6b 100644
--- a/dependencies/default/pom.xml
+++ b/dependencies/default/pom.xml
@@ -112,6 +112,7 @@
<zookeeper.version>3.4.14</zookeeper.version>
<nacos-client.version>1.1.4</nacos-client.version>
<resilience4j.versioin>1.5.0</resilience4j.versioin>
+ <java-websocket.version>1.5.1</java-websocket.version>
<!-- Base dir of main -->
<main.basedir>${basedir}/../..</main.basedir>
</properties>
@@ -874,6 +875,12 @@
</dependency>
<dependency>
+ <groupId>org.java-websocket</groupId>
+ <artifactId>Java-WebSocket</artifactId>
+ <version>${java-websocket.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-bom</artifactId>
<version>${project.version}</version>