You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ya...@apache.org on 2019/05/24 07:12:50 UTC

[servicecomb-java-chassis] 01/02: [SCB-1274]when service center is unavailable, service should enter safe mode.

This is an automated email from the ASF dual-hosted git repository.

yaohaishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit 84d8979302b8994088d0aa3bdd8db804fcad0ed9
Author: heyile <25...@qq.com>
AuthorDate: Tue Apr 30 10:40:19 2019 +0800

    [SCB-1274]when service center is unavailable, service should enter safe mode.
---
 .../consumer/MicroserviceVersions.java             | 39 ++++++++++++------
 .../registry/AbstractServiceRegistry.java          |  3 +-
 .../serviceregistry/task/ServiceCenterTask.java    | 40 +++++++++++++++++-
 .../task/event/SafeModeChangeEvent.java            | 32 +++++++++++++++
 .../task/TestServiceCenterTask.java                | 48 +++++++++++++++++++++-
 5 files changed, 147 insertions(+), 15 deletions(-)

diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index a870554..16bce48 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -17,11 +17,11 @@
 
 package org.apache.servicecomb.serviceregistry.consumer;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
@@ -34,6 +34,7 @@ import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import org.apache.servicecomb.serviceregistry.definition.DefinitionConst;
 import org.apache.servicecomb.serviceregistry.task.event.MicroserviceNotExistEvent;
 import org.apache.servicecomb.serviceregistry.task.event.PullMicroserviceVersionsInstancesEvent;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,9 @@ public class MicroserviceVersions {
 
   private List<MicroserviceInstance> pulledInstances;
 
+  // in safe mode, instances will never be deleted
+  private boolean safeMode = false;
+
   // instances not always equals to pulledInstances
   // in the future:
   //  pulledInstances means all instance
@@ -193,20 +197,26 @@ public class MicroserviceVersions {
 
   private List<MicroserviceInstance> mergeInstances(List<MicroserviceInstance> pulledInstances,
       List<MicroserviceInstance> inUseInstances) {
-    List<MicroserviceInstance> upInstances = pulledInstances
-        .stream()
-        .collect(Collectors.toList());
+    List<MicroserviceInstance> upInstances = new ArrayList<>(pulledInstances);
+    if (safeMode) {
+      // in safe mode, instances will never be deleted
+      upInstances.forEach(instance -> {
+        if (!inUseInstances.contains(instance)) {
+          inUseInstances.add(instance);
+        }
+      });
+      return inUseInstances;
+    }
     if (upInstances.isEmpty() && inUseInstances != null && ServiceRegistryConfig.INSTANCE
         .isEmptyInstanceProtectionEnabled()) {
       MicroserviceInstancePing ping = SPIServiceUtils.getPriorityHighestService(MicroserviceInstancePing.class);
-      inUseInstances.stream()
-          .forEach(instance -> {
-            if (!upInstances.contains(instance)) {
-              if (ping.ping(instance)) {
-                upInstances.add(instance);
-              }
-            }
-          });
+      inUseInstances.forEach(instance -> {
+        if (!upInstances.contains(instance)) {
+          if (ping.ping(instance)) {
+            upInstances.add(instance);
+          }
+        }
+      });
     }
     return upInstances;
   }
@@ -254,6 +264,11 @@ public class MicroserviceVersions {
     postPullInstanceEvent(0);
   }
 
+  @Subscribe
+  public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
+    this.safeMode = modeChangeEvent.getCurrentMode();
+  }
+
   protected boolean isEventAccept(MicroserviceInstanceChangedEvent changedEvent) {
     return (appId.equals(changedEvent.getKey().getAppId()) &&
         microserviceName.equals(changedEvent.getKey().getServiceName())) ||
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 4f65821..c538da6 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -200,7 +200,8 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
   private void createServiceCenterTask() {
     MicroserviceServiceCenterTask task =
         new MicroserviceServiceCenterTask(eventBus, serviceRegistryConfig, srClient, microservice);
-    serviceCenterTask = new ServiceCenterTask(eventBus, serviceRegistryConfig.getHeartbeatInterval(), task);
+    serviceCenterTask = new ServiceCenterTask(eventBus, serviceRegistryConfig.getHeartbeatInterval(),
+        serviceRegistryConfig.getResendHeartBeatTimes(), task);
   }
 
   public boolean unregisterInstance() {
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
index 58f85d9..48fa775 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
@@ -16,10 +16,14 @@
  */
 package org.apache.servicecomb.serviceregistry.task;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.servicecomb.serviceregistry.task.event.ExceptionEvent;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 
@@ -30,16 +34,25 @@ public class ServiceCenterTask implements Runnable {
 
   private int interval;
 
+  private int checkTimes;
+
+  private AtomicLong consecutiveFailedTimes = new AtomicLong();
+
+  private AtomicLong consecutiveSucceededTimes = new AtomicLong();
+
+  private boolean safeMode = false;
+
   private MicroserviceServiceCenterTask microserviceServiceCenterTask;
 
   private boolean registerInstanceSuccess = false;
 
   private ServiceCenterTaskMonitor serviceCenterTaskMonitor = new ServiceCenterTaskMonitor();
 
-  public ServiceCenterTask(EventBus eventBus, int interval,
+  public ServiceCenterTask(EventBus eventBus, int interval, int checkTimes,
       MicroserviceServiceCenterTask microserviceServiceCenterTask) {
     this.eventBus = eventBus;
     this.interval = interval;
+    this.checkTimes = checkTimes;
     this.microserviceServiceCenterTask = microserviceServiceCenterTask;
 
     this.eventBus.register(this);
@@ -62,7 +75,27 @@ public class ServiceCenterTask implements Runnable {
     if (task.getHeartbeatResult() != HeartbeatResult.SUCCESS) {
       LOGGER.info("read MicroserviceInstanceHeartbeatTask status is {}", task.taskStatus);
       onException();
+      if (!safeMode && consecutiveFailedTimes.incrementAndGet() > checkTimes) {
+        LOGGER.warn("service center is unavailable, enter safe mode");
+        eventBus.post(new SafeModeChangeEvent(true));
+      }
+      if (consecutiveSucceededTimes.get() != 0) {
+        consecutiveSucceededTimes.set(0);
+      }
+      return;
+    }
+    if (safeMode && consecutiveSucceededTimes.incrementAndGet() > checkTimes) {
+      LOGGER.warn("service center is recovery, exit safe mode");
+      eventBus.post(new SafeModeChangeEvent(false));
     }
+    if (consecutiveFailedTimes.get() != 0) {
+      consecutiveFailedTimes.set(0);
+    }
+  }
+
+  @Subscribe
+  public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
+    safeMode = modeChangeEvent.getCurrentMode();
   }
 
   // messages given in watch error
@@ -92,4 +125,9 @@ public class ServiceCenterTask implements Runnable {
       LOGGER.error("unexpected exception caught from service center task. ", e);
     }
   }
+
+  @VisibleForTesting
+  public boolean getSafeMode() {
+    return safeMode;
+  }
 }
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java
new file mode 100644
index 0000000..31c4d75
--- /dev/null
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.serviceregistry.task.event;
+
+/**
+ * In safe mode, consumer will not delete instances.
+ */
+public class SafeModeChangeEvent {
+  private boolean currentMode;
+
+  public SafeModeChangeEvent(boolean currentMode) {
+    this.currentMode = currentMode;
+  }
+
+  public boolean getCurrentMode() {
+    return currentMode;
+  }
+}
diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
index a7913d1..c31d57d 100644
--- a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
+++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
@@ -16,8 +16,13 @@
  */
 package org.apache.servicecomb.serviceregistry.task;
 
+import java.util.EventListener;
+
+import javax.xml.ws.Holder;
+
 import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -25,6 +30,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 
 import mockit.Deencapsulation;
 import mockit.Expectations;
@@ -51,7 +57,7 @@ public class TestServiceCenterTask {
   @Before
   public void init() {
     serviceCenterTask =
-        new ServiceCenterTask(eventBus, ServiceRegistryConfig.INSTANCE.getHeartbeatInterval(),
+        new ServiceCenterTask(eventBus, ServiceRegistryConfig.INSTANCE.getHeartbeatInterval(), 3,
             microserviceServiceCenterTask);
   }
 
@@ -92,4 +98,44 @@ public class TestServiceCenterTask {
     eventBus.post(heartBeatEvent);
     Assert.assertTrue(Deencapsulation.getField(serviceCenterTask, "registerInstanceSuccess"));
   }
+
+  @Test
+  public void testSafeMode(@Mocked MicroserviceInstanceHeartbeatTask succeededTask,
+      @Mocked MicroserviceInstanceHeartbeatTask failedTask) {
+    new Expectations() {
+      {
+        succeededTask.getHeartbeatResult();
+        result = HeartbeatResult.SUCCESS;
+        failedTask.getHeartbeatResult();
+        result = HeartbeatResult.DISCONNECTED;
+      }
+    };
+    Holder<Integer> count = new Holder<>(0);
+    EventListener eventListener = new EventListener() {
+      @Subscribe
+      public void onModeChanged(SafeModeChangeEvent modeChangeEvent) {
+        count.value++;
+      }
+    };
+    eventBus.register(eventListener);
+    Assert.assertEquals(0, count.value.intValue());
+    eventBus.post(failedTask);
+    eventBus.post(failedTask);
+    eventBus.post(failedTask);
+    Assert.assertEquals(0, count.value.intValue());
+    Assert.assertFalse(serviceCenterTask.getSafeMode());
+    eventBus.post(failedTask);
+    Assert.assertEquals(1, count.value.intValue());
+    Assert.assertTrue(serviceCenterTask.getSafeMode());
+
+    eventBus.post(succeededTask);
+    eventBus.post(succeededTask);
+    eventBus.post(succeededTask);
+    Assert.assertTrue(serviceCenterTask.getSafeMode());
+    Assert.assertEquals(1, count.value.intValue());
+    eventBus.post(succeededTask);
+    Assert.assertFalse(serviceCenterTask.getSafeMode());
+    Assert.assertEquals(2, count.value.intValue());
+    eventBus.unregister(eventListener);
+  }
 }