You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ya...@apache.org on 2020/03/09 16:14:59 UTC
[servicecomb-java-chassis] 10/19: [SCB-1691] ServiceRegistry use
serviceRegistryCache
This is an automated email from the ASF dual-hosted git repository.
yaohaishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit d4e8db6970c4a8e5b383c0b9ebc114118497757e
Author: yhs0092 <yh...@163.com>
AuthorDate: Sat Feb 15 19:07:07 2020 +0800
[SCB-1691] ServiceRegistry use serviceRegistryCache
---
.../servicecomb/serviceregistry/RegistryUtils.java | 33 +++++++++
.../consumer/MicroserviceVersions.java | 2 +-
.../registry/AbstractServiceRegistry.java | 80 ++++++++++------------
.../registry/RemoteServiceRegistry.java | 5 +-
.../serviceregistry/RegistryUtilsTest.java | 77 +++++++++++++++++++++
5 files changed, 151 insertions(+), 46 deletions(-)
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
index 46de489..2f93bd6 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.serviceregistry;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -33,6 +34,7 @@ import org.apache.servicecomb.foundation.common.net.IpPort;
import org.apache.servicecomb.foundation.common.net.NetUtils;
import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
+import org.apache.servicecomb.serviceregistry.api.response.FindInstancesResponse;
import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManager;
import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManagerNew;
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
@@ -41,6 +43,7 @@ import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import org.apache.servicecomb.serviceregistry.consumer.AppManager;
import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition;
import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
import org.apache.servicecomb.serviceregistry.swagger.SwaggerLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -250,6 +253,36 @@ public final class RegistryUtils {
return serviceRegistry.findServiceInstances(appId, serviceName, versionRule, revision);
}
+ /**
+ * for compatibility
+ */
+ public static MicroserviceInstances convertCacheToMicroserviceInstances(MicroserviceCache microserviceCache) {
+ MicroserviceInstances microserviceInstances = new MicroserviceInstances();
+ switch (microserviceCache.getStatus()) {
+ case SERVICE_NOT_FOUND:
+ microserviceInstances.setMicroserviceNotExist(true);
+ microserviceInstances.setNeedRefresh(false);
+ microserviceInstances.setRevision("");
+ microserviceInstances.setInstancesResponse(null);
+ return microserviceInstances;
+ case NO_CHANGE:
+ microserviceInstances.setMicroserviceNotExist(false);
+ microserviceInstances.setNeedRefresh(false);
+ microserviceInstances.setRevision(microserviceCache.getRevisionId());
+ return microserviceInstances;
+ case REFRESHED:
+ microserviceInstances.setMicroserviceNotExist(false);
+ microserviceInstances.setNeedRefresh(true);
+ microserviceInstances.setRevision(microserviceCache.getRevisionId());
+ FindInstancesResponse findInstancesResponse = new FindInstancesResponse();
+ findInstancesResponse.setInstances(new ArrayList<>(microserviceCache.getInstances()));
+ microserviceInstances.setInstancesResponse(findInstancesResponse);
+ return microserviceInstances;
+ default:
+ return null;
+ }
+ }
+
public static String calcSchemaSummary(String schemaContent) {
return Hashing.sha256().newHasher().putString(schemaContent, Charsets.UTF_8).hash().toString();
}
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index 8802a15..5dc1cec 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -166,7 +166,7 @@ public class MicroserviceVersions {
return;
}
- if (!microserviceInstances.isNeedRefresh()) {
+ if (null != revision && revision.equals(microserviceInstances.getRevision())) {
return;
}
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index a37fdbc..214968a 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.serviceregistry.Features;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
@@ -48,6 +49,9 @@ import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition;
import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
+import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache;
+import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache;
import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask;
import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask;
import org.apache.servicecomb.serviceregistry.task.event.RecoveryEvent;
@@ -85,6 +89,8 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
private String name;
+ RefreshableServiceRegistryCache serviceRegistryCache;
+
public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig,
MicroserviceDefinition microserviceDefinition) {
setName(serviceRegistryConfig.getRegistryName());
@@ -104,6 +110,14 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
createServiceCenterTask();
eventBus.register(this);
+
+ initCache();
+ }
+
+ private void initCache() {
+ serviceRegistryCache = new RefreshableServiceRegistryCache(microservice, srClient);
+ serviceRegistryCache.setCacheRefreshedWatcher(
+ caches -> eventBus.post(new MicroserviceCacheRefreshedEvent(caches)));
}
@Override
@@ -207,50 +221,15 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
public MicroserviceInstances findServiceInstances(String appId, String serviceName,
String versionRule, String revision) {
- MicroserviceInstances microserviceInstances = srClient.findServiceInstances(microservice.getServiceId(),
- appId,
- serviceName,
- versionRule,
- revision);
-
- if (microserviceInstances == null) {
- LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}",
- appId,
- serviceName,
- versionRule);
- return null;
- }
-
- if (microserviceInstances.isMicroserviceNotExist()) {
- return microserviceInstances;
- }
-
- if (!microserviceInstances.isNeedRefresh()) {
- LOGGER.debug("instances revision is not changed, service={}/{}/{}", appId, serviceName, versionRule);
- return microserviceInstances;
- }
-
- List<MicroserviceInstance> instances = microserviceInstances.getInstancesResponse().getInstances();
- LOGGER.info("find instances[{}] from service center success. service={}/{}/{}, old revision={}, new revision={}",
- instances.size(),
- appId,
- serviceName,
- versionRule,
- revision,
- microserviceInstances.getRevision());
- for (MicroserviceInstance instance : instances) {
- LOGGER.info("service id={}, instance id={}, endpoints={}",
- instance.getServiceId(),
- instance.getInstanceId(),
- instance.getEndpoints());
- }
- return microserviceInstances;
+ MicroserviceCache microserviceCache = serviceRegistryCache
+ .findServiceCache(MicroserviceCacheKey.builder()
+ .serviceName(serviceName).appId(appId).env(microservice.getEnvironment()).build());
+ return RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
}
@Override
public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) {
- // TODO find MicroserviceCache from ServiceRegistryCache
- return null;
+ return serviceRegistryCache.findServiceCache(microserviceCacheKey);
}
@Override
@@ -333,6 +312,10 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
this.name = name;
}
+ public ServiceRegistryCache getServiceRegistryCache() {
+ return serviceRegistryCache;
+ }
+
@Subscribe
public void onShutdown(ShutdownEvent event) {
LOGGER.info("service center task is shutdown.");
@@ -342,17 +325,28 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
// post from watch eventloop, should refresh the exact microservice instances immediately
@Subscribe
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
- executorService.execute(() -> RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent));
+ executorService.execute(new SuppressedRunnableWrapper(
+ () -> {
+ serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
+ RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent);
+ }));
}
// post from watch eventloop, should refresh all instances immediately
@Subscribe
public void serviceRegistryRecovery(RecoveryEvent event) {
- executorService.execute(RegistryUtils.getAppManager()::pullInstances);
+ executorService.execute(() -> {
+ serviceRegistryCache.forceRefreshCache();
+ RegistryUtils.getAppManager().pullInstances();
+ });
}
@Subscribe
public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
- executorService.execute(() -> RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent));
+ executorService.execute(() -> {
+ LOGGER.warn("receive SafeModeChangeEvent, current mode={}", modeChangeEvent.getCurrentMode());
+ serviceRegistryCache.onSafeModeChanged(modeChangeEvent);
+ RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent);
+ });
}
}
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index b6af226..2002177 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
-import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.apache.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
@@ -85,7 +84,9 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
TimeUnit.SECONDS);
taskPool.scheduleAtFixedRate(
- new SuppressedRunnableWrapper(RegistryUtils.getAppManager()::pullInstances),
+ new SuppressedRunnableWrapper(() -> {
+ serviceRegistryCache.refreshCache();
+ }),
serviceRegistryConfig.getInstancePullInterval(),
serviceRegistryConfig.getInstancePullInterval(),
TimeUnit.SECONDS);
diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java
new file mode 100644
index 0000000..ab7efc5
--- /dev/null
+++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.serviceregistry;
+
+import java.util.Collections;
+
+import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
+import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus;
+import org.apache.servicecomb.serviceregistry.registry.cache.MockedMicroserviceCache;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegistryUtilsTest {
+ @Test
+ public void convertCacheToMicroserviceInstances() {
+ MockedMicroserviceCache microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.CLIENT_ERROR);
+ MicroserviceInstances microserviceInstances = RegistryUtils
+ .convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertNull(microserviceInstances);
+
+ microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.SETTING_CACHE_ERROR);
+ microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertNull(microserviceInstances);
+
+ microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.INIT);
+ microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertNull(microserviceInstances);
+
+ microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND);
+ microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertTrue(microserviceInstances.isMicroserviceNotExist());
+ Assert.assertFalse(microserviceInstances.isNeedRefresh());
+ Assert.assertEquals("", microserviceInstances.getRevision());
+ Assert.assertNull(microserviceInstances.getInstancesResponse());
+
+ microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.REFRESHED);
+ microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab");
+ MicroserviceInstance microserviceInstance = new MicroserviceInstance();
+ microserviceCache.setInstances(Collections.singletonList(microserviceInstance));
+ microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertFalse(microserviceInstances.isMicroserviceNotExist());
+ Assert.assertTrue(microserviceInstances.isNeedRefresh());
+ Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision());
+ Assert.assertEquals(1, microserviceInstances.getInstancesResponse().getInstances().size());
+ Assert.assertSame(microserviceInstance, microserviceInstances.getInstancesResponse().getInstances().get(0));
+
+ microserviceCache = new MockedMicroserviceCache();
+ microserviceCache.setStatus(MicroserviceCacheStatus.NO_CHANGE);
+ microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab");
+ microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+ Assert.assertFalse(microserviceInstances.isMicroserviceNotExist());
+ Assert.assertFalse(microserviceInstances.isNeedRefresh());
+ Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision());
+ Assert.assertNull(microserviceInstances.getInstancesResponse());
+ }
+}
\ No newline at end of file