You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ya...@apache.org on 2020/03/09 16:14:59 UTC

[servicecomb-java-chassis] 10/19: [SCB-1691] ServiceRegistry use serviceRegistryCache

This is an automated email from the ASF dual-hosted git repository.

yaohaishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit d4e8db6970c4a8e5b383c0b9ebc114118497757e
Author: yhs0092 <yh...@163.com>
AuthorDate: Sat Feb 15 19:07:07 2020 +0800

    [SCB-1691] ServiceRegistry use serviceRegistryCache
---
 .../servicecomb/serviceregistry/RegistryUtils.java | 33 +++++++++
 .../consumer/MicroserviceVersions.java             |  2 +-
 .../registry/AbstractServiceRegistry.java          | 80 ++++++++++------------
 .../registry/RemoteServiceRegistry.java            |  5 +-
 .../serviceregistry/RegistryUtilsTest.java         | 77 +++++++++++++++++++++
 5 files changed, 151 insertions(+), 46 deletions(-)

diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
index 46de489..2f93bd6 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
@@ -20,6 +20,7 @@ package org.apache.servicecomb.serviceregistry;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -33,6 +34,7 @@ import org.apache.servicecomb.foundation.common.net.IpPort;
 import org.apache.servicecomb.foundation.common.net.NetUtils;
 import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
 import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
+import org.apache.servicecomb.serviceregistry.api.response.FindInstancesResponse;
 import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManager;
 import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManagerNew;
 import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
@@ -41,6 +43,7 @@ import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import org.apache.servicecomb.serviceregistry.consumer.AppManager;
 import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition;
 import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
 import org.apache.servicecomb.serviceregistry.swagger.SwaggerLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -250,6 +253,36 @@ public final class RegistryUtils {
     return serviceRegistry.findServiceInstances(appId, serviceName, versionRule, revision);
   }
 
+  /**
+   * for compatibility
+   */
+  public static MicroserviceInstances convertCacheToMicroserviceInstances(MicroserviceCache microserviceCache) {
+    MicroserviceInstances microserviceInstances = new MicroserviceInstances();
+    switch (microserviceCache.getStatus()) {
+      case SERVICE_NOT_FOUND:
+        microserviceInstances.setMicroserviceNotExist(true);
+        microserviceInstances.setNeedRefresh(false);
+        microserviceInstances.setRevision("");
+        microserviceInstances.setInstancesResponse(null);
+        return microserviceInstances;
+      case NO_CHANGE:
+        microserviceInstances.setMicroserviceNotExist(false);
+        microserviceInstances.setNeedRefresh(false);
+        microserviceInstances.setRevision(microserviceCache.getRevisionId());
+        return microserviceInstances;
+      case REFRESHED:
+        microserviceInstances.setMicroserviceNotExist(false);
+        microserviceInstances.setNeedRefresh(true);
+        microserviceInstances.setRevision(microserviceCache.getRevisionId());
+        FindInstancesResponse findInstancesResponse = new FindInstancesResponse();
+        findInstancesResponse.setInstances(new ArrayList<>(microserviceCache.getInstances()));
+        microserviceInstances.setInstancesResponse(findInstancesResponse);
+        return microserviceInstances;
+      default:
+        return null;
+    }
+  }
+
   public static String calcSchemaSummary(String schemaContent) {
     return Hashing.sha256().newHasher().putString(schemaContent, Charsets.UTF_8).hash().toString();
   }
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index 8802a15..5dc1cec 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -166,7 +166,7 @@ public class MicroserviceVersions {
       return;
     }
 
-    if (!microserviceInstances.isNeedRefresh()) {
+    if (null != revision && revision.equals(microserviceInstances.getRevision())) {
       return;
     }
 
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index a37fdbc..214968a 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
 import org.apache.servicecomb.serviceregistry.Features;
 import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.ServiceRegistry;
@@ -48,6 +49,9 @@ import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition;
 import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
+import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache;
+import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache;
 import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask;
 import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask;
 import org.apache.servicecomb.serviceregistry.task.event.RecoveryEvent;
@@ -85,6 +89,8 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
 
   private String name;
 
+  RefreshableServiceRegistryCache serviceRegistryCache;
+
   public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig,
       MicroserviceDefinition microserviceDefinition) {
     setName(serviceRegistryConfig.getRegistryName());
@@ -104,6 +110,14 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
     createServiceCenterTask();
 
     eventBus.register(this);
+
+    initCache();
+  }
+
+  private void initCache() {
+    serviceRegistryCache = new RefreshableServiceRegistryCache(microservice, srClient);
+    serviceRegistryCache.setCacheRefreshedWatcher(
+        caches -> eventBus.post(new MicroserviceCacheRefreshedEvent(caches)));
   }
 
   @Override
@@ -207,50 +221,15 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
 
   public MicroserviceInstances findServiceInstances(String appId, String serviceName,
       String versionRule, String revision) {
-    MicroserviceInstances microserviceInstances = srClient.findServiceInstances(microservice.getServiceId(),
-        appId,
-        serviceName,
-        versionRule,
-        revision);
-
-    if (microserviceInstances == null) {
-      LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}",
-          appId,
-          serviceName,
-          versionRule);
-      return null;
-    }
-
-    if (microserviceInstances.isMicroserviceNotExist()) {
-      return microserviceInstances;
-    }
-
-    if (!microserviceInstances.isNeedRefresh()) {
-      LOGGER.debug("instances revision is not changed, service={}/{}/{}", appId, serviceName, versionRule);
-      return microserviceInstances;
-    }
-
-    List<MicroserviceInstance> instances = microserviceInstances.getInstancesResponse().getInstances();
-    LOGGER.info("find instances[{}] from service center success. service={}/{}/{}, old revision={}, new revision={}",
-        instances.size(),
-        appId,
-        serviceName,
-        versionRule,
-        revision,
-        microserviceInstances.getRevision());
-    for (MicroserviceInstance instance : instances) {
-      LOGGER.info("service id={}, instance id={}, endpoints={}",
-          instance.getServiceId(),
-          instance.getInstanceId(),
-          instance.getEndpoints());
-    }
-    return microserviceInstances;
+    MicroserviceCache microserviceCache = serviceRegistryCache
+        .findServiceCache(MicroserviceCacheKey.builder()
+            .serviceName(serviceName).appId(appId).env(microservice.getEnvironment()).build());
+    return RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
   }
 
   @Override
   public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) {
-    // TODO find MicroserviceCache from ServiceRegistryCache
-    return null;
+    return serviceRegistryCache.findServiceCache(microserviceCacheKey);
   }
 
   @Override
@@ -333,6 +312,10 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
     this.name = name;
   }
 
+  public ServiceRegistryCache getServiceRegistryCache() {
+    return serviceRegistryCache;
+  }
+
   @Subscribe
   public void onShutdown(ShutdownEvent event) {
     LOGGER.info("service center task is shutdown.");
@@ -342,17 +325,28 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
   // post from watch eventloop, should refresh the exact microservice instances immediately
   @Subscribe
   public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
-    executorService.execute(() -> RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent));
+    executorService.execute(new SuppressedRunnableWrapper(
+        () -> {
+          serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
+          RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent);
+        }));
   }
 
   // post from watch eventloop, should refresh all instances immediately
   @Subscribe
   public void serviceRegistryRecovery(RecoveryEvent event) {
-    executorService.execute(RegistryUtils.getAppManager()::pullInstances);
+    executorService.execute(() -> {
+      serviceRegistryCache.forceRefreshCache();
+      RegistryUtils.getAppManager().pullInstances();
+    });
   }
 
   @Subscribe
   public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
-    executorService.execute(() -> RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent));
+    executorService.execute(() -> {
+      LOGGER.warn("receive SafeModeChangeEvent, current mode={}", modeChangeEvent.getCurrentMode());
+      serviceRegistryCache.onSafeModeChanged(modeChangeEvent);
+      RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent);
+    });
   }
 }
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index b6af226..2002177 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
-import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
 import org.apache.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
@@ -85,7 +84,9 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry {
         TimeUnit.SECONDS);
 
     taskPool.scheduleAtFixedRate(
-        new SuppressedRunnableWrapper(RegistryUtils.getAppManager()::pullInstances),
+        new SuppressedRunnableWrapper(() -> {
+          serviceRegistryCache.refreshCache();
+        }),
         serviceRegistryConfig.getInstancePullInterval(),
         serviceRegistryConfig.getInstancePullInterval(),
         TimeUnit.SECONDS);
diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java
new file mode 100644
index 0000000..ab7efc5
--- /dev/null
+++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.serviceregistry;
+
+import java.util.Collections;
+
+import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
+import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances;
+import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus;
+import org.apache.servicecomb.serviceregistry.registry.cache.MockedMicroserviceCache;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegistryUtilsTest {
+  @Test
+  public void convertCacheToMicroserviceInstances() {
+    MockedMicroserviceCache microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.CLIENT_ERROR);
+    MicroserviceInstances microserviceInstances = RegistryUtils
+        .convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertNull(microserviceInstances);
+
+    microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.SETTING_CACHE_ERROR);
+    microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertNull(microserviceInstances);
+
+    microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.INIT);
+    microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertNull(microserviceInstances);
+
+    microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND);
+    microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertTrue(microserviceInstances.isMicroserviceNotExist());
+    Assert.assertFalse(microserviceInstances.isNeedRefresh());
+    Assert.assertEquals("", microserviceInstances.getRevision());
+    Assert.assertNull(microserviceInstances.getInstancesResponse());
+
+    microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.REFRESHED);
+    microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab");
+    MicroserviceInstance microserviceInstance = new MicroserviceInstance();
+    microserviceCache.setInstances(Collections.singletonList(microserviceInstance));
+    microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertFalse(microserviceInstances.isMicroserviceNotExist());
+    Assert.assertTrue(microserviceInstances.isNeedRefresh());
+    Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision());
+    Assert.assertEquals(1, microserviceInstances.getInstancesResponse().getInstances().size());
+    Assert.assertSame(microserviceInstance, microserviceInstances.getInstancesResponse().getInstances().get(0));
+
+    microserviceCache = new MockedMicroserviceCache();
+    microserviceCache.setStatus(MicroserviceCacheStatus.NO_CHANGE);
+    microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab");
+    microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
+    Assert.assertFalse(microserviceInstances.isMicroserviceNotExist());
+    Assert.assertFalse(microserviceInstances.isNeedRefresh());
+    Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision());
+    Assert.assertNull(microserviceInstances.getInstancesResponse());
+  }
+}
\ No newline at end of file