You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ya...@apache.org on 2019/05/24 07:12:50 UTC
[servicecomb-java-chassis] 01/02: [SCB-1274]when service center is
unavailable, service should enter safe mode.
This is an automated email from the ASF dual-hosted git repository.
yaohaishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 84d8979302b8994088d0aa3bdd8db804fcad0ed9
Author: heyile <25...@qq.com>
AuthorDate: Tue Apr 30 10:40:19 2019 +0800
[SCB-1274]when service center is unavailable, service should enter safe mode.
---
.../consumer/MicroserviceVersions.java | 39 ++++++++++++------
.../registry/AbstractServiceRegistry.java | 3 +-
.../serviceregistry/task/ServiceCenterTask.java | 40 +++++++++++++++++-
.../task/event/SafeModeChangeEvent.java | 32 +++++++++++++++
.../task/TestServiceCenterTask.java | 48 +++++++++++++++++++++-
5 files changed, 147 insertions(+), 15 deletions(-)
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
index a870554..16bce48 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java
@@ -17,11 +17,11 @@
package org.apache.servicecomb.serviceregistry.consumer;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
@@ -34,6 +34,7 @@ import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import org.apache.servicecomb.serviceregistry.definition.DefinitionConst;
import org.apache.servicecomb.serviceregistry.task.event.MicroserviceNotExistEvent;
import org.apache.servicecomb.serviceregistry.task.event.PullMicroserviceVersionsInstancesEvent;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,9 @@ public class MicroserviceVersions {
private List<MicroserviceInstance> pulledInstances;
+ // in safe mode, instances will never be deleted
+ private boolean safeMode = false;
+
// instances not always equals to pulledInstances
// in the future:
// pulledInstances means all instance
@@ -193,20 +197,26 @@ public class MicroserviceVersions {
private List<MicroserviceInstance> mergeInstances(List<MicroserviceInstance> pulledInstances,
List<MicroserviceInstance> inUseInstances) {
- List<MicroserviceInstance> upInstances = pulledInstances
- .stream()
- .collect(Collectors.toList());
+ List<MicroserviceInstance> upInstances = new ArrayList<>(pulledInstances);
+ if (safeMode) {
+ // in safe mode, instances will never be deleted
+ upInstances.forEach(instance -> {
+ if (!inUseInstances.contains(instance)) {
+ inUseInstances.add(instance);
+ }
+ });
+ return inUseInstances;
+ }
if (upInstances.isEmpty() && inUseInstances != null && ServiceRegistryConfig.INSTANCE
.isEmptyInstanceProtectionEnabled()) {
MicroserviceInstancePing ping = SPIServiceUtils.getPriorityHighestService(MicroserviceInstancePing.class);
- inUseInstances.stream()
- .forEach(instance -> {
- if (!upInstances.contains(instance)) {
- if (ping.ping(instance)) {
- upInstances.add(instance);
- }
- }
- });
+ inUseInstances.forEach(instance -> {
+ if (!upInstances.contains(instance)) {
+ if (ping.ping(instance)) {
+ upInstances.add(instance);
+ }
+ }
+ });
}
return upInstances;
}
@@ -254,6 +264,11 @@ public class MicroserviceVersions {
postPullInstanceEvent(0);
}
+ @Subscribe
+ public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
+ this.safeMode = modeChangeEvent.getCurrentMode();
+ }
+
protected boolean isEventAccept(MicroserviceInstanceChangedEvent changedEvent) {
return (appId.equals(changedEvent.getKey().getAppId()) &&
microserviceName.equals(changedEvent.getKey().getServiceName())) ||
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 4f65821..c538da6 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -200,7 +200,8 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry {
private void createServiceCenterTask() {
MicroserviceServiceCenterTask task =
new MicroserviceServiceCenterTask(eventBus, serviceRegistryConfig, srClient, microservice);
- serviceCenterTask = new ServiceCenterTask(eventBus, serviceRegistryConfig.getHeartbeatInterval(), task);
+ serviceCenterTask = new ServiceCenterTask(eventBus, serviceRegistryConfig.getHeartbeatInterval(),
+ serviceRegistryConfig.getResendHeartBeatTimes(), task);
}
public boolean unregisterInstance() {
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
index 58f85d9..48fa775 100644
--- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/ServiceCenterTask.java
@@ -16,10 +16,14 @@
*/
package org.apache.servicecomb.serviceregistry.task;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.servicecomb.serviceregistry.task.event.ExceptionEvent;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
@@ -30,16 +34,25 @@ public class ServiceCenterTask implements Runnable {
private int interval;
+ private int checkTimes;
+
+ private AtomicLong consecutiveFailedTimes = new AtomicLong();
+
+ private AtomicLong consecutiveSucceededTimes = new AtomicLong();
+
+ private boolean safeMode = false;
+
private MicroserviceServiceCenterTask microserviceServiceCenterTask;
private boolean registerInstanceSuccess = false;
private ServiceCenterTaskMonitor serviceCenterTaskMonitor = new ServiceCenterTaskMonitor();
- public ServiceCenterTask(EventBus eventBus, int interval,
+ public ServiceCenterTask(EventBus eventBus, int interval, int checkTimes,
MicroserviceServiceCenterTask microserviceServiceCenterTask) {
this.eventBus = eventBus;
this.interval = interval;
+ this.checkTimes = checkTimes;
this.microserviceServiceCenterTask = microserviceServiceCenterTask;
this.eventBus.register(this);
@@ -62,7 +75,27 @@ public class ServiceCenterTask implements Runnable {
if (task.getHeartbeatResult() != HeartbeatResult.SUCCESS) {
LOGGER.info("read MicroserviceInstanceHeartbeatTask status is {}", task.taskStatus);
onException();
+ if (!safeMode && consecutiveFailedTimes.incrementAndGet() > checkTimes) {
+ LOGGER.warn("service center is unavailable, enter safe mode");
+ eventBus.post(new SafeModeChangeEvent(true));
+ }
+ if (consecutiveSucceededTimes.get() != 0) {
+ consecutiveSucceededTimes.set(0);
+ }
+ return;
+ }
+ if (safeMode && consecutiveSucceededTimes.incrementAndGet() > checkTimes) {
+ LOGGER.warn("service center is recovery, exit safe mode");
+ eventBus.post(new SafeModeChangeEvent(false));
}
+ if (consecutiveFailedTimes.get() != 0) {
+ consecutiveFailedTimes.set(0);
+ }
+ }
+
+ @Subscribe
+ public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) {
+ safeMode = modeChangeEvent.getCurrentMode();
}
// messages given in watch error
@@ -92,4 +125,9 @@ public class ServiceCenterTask implements Runnable {
LOGGER.error("unexpected exception caught from service center task. ", e);
}
}
+
+ @VisibleForTesting
+ public boolean getSafeMode() {
+ return safeMode;
+ }
}
diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java
new file mode 100644
index 0000000..31c4d75
--- /dev/null
+++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/task/event/SafeModeChangeEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.serviceregistry.task.event;
+
+/**
+ * In safe mode, consumer will not delete instances.
+ */
+public class SafeModeChangeEvent {
+ private boolean currentMode;
+
+ public SafeModeChangeEvent(boolean currentMode) {
+ this.currentMode = currentMode;
+ }
+
+ public boolean getCurrentMode() {
+ return currentMode;
+ }
+}
diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
index a7913d1..c31d57d 100644
--- a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
+++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/task/TestServiceCenterTask.java
@@ -16,8 +16,13 @@
*/
package org.apache.servicecomb.serviceregistry.task;
+import java.util.EventListener;
+
+import javax.xml.ws.Holder;
+
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
+import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -25,6 +30,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import mockit.Deencapsulation;
import mockit.Expectations;
@@ -51,7 +57,7 @@ public class TestServiceCenterTask {
@Before
public void init() {
serviceCenterTask =
- new ServiceCenterTask(eventBus, ServiceRegistryConfig.INSTANCE.getHeartbeatInterval(),
+ new ServiceCenterTask(eventBus, ServiceRegistryConfig.INSTANCE.getHeartbeatInterval(), 3,
microserviceServiceCenterTask);
}
@@ -92,4 +98,44 @@ public class TestServiceCenterTask {
eventBus.post(heartBeatEvent);
Assert.assertTrue(Deencapsulation.getField(serviceCenterTask, "registerInstanceSuccess"));
}
+
+ @Test
+ public void testSafeMode(@Mocked MicroserviceInstanceHeartbeatTask succeededTask,
+ @Mocked MicroserviceInstanceHeartbeatTask failedTask) {
+ new Expectations() {
+ {
+ succeededTask.getHeartbeatResult();
+ result = HeartbeatResult.SUCCESS;
+ failedTask.getHeartbeatResult();
+ result = HeartbeatResult.DISCONNECTED;
+ }
+ };
+ Holder<Integer> count = new Holder<>(0);
+ EventListener eventListener = new EventListener() {
+ @Subscribe
+ public void onModeChanged(SafeModeChangeEvent modeChangeEvent) {
+ count.value++;
+ }
+ };
+ eventBus.register(eventListener);
+ Assert.assertEquals(0, count.value.intValue());
+ eventBus.post(failedTask);
+ eventBus.post(failedTask);
+ eventBus.post(failedTask);
+ Assert.assertEquals(0, count.value.intValue());
+ Assert.assertFalse(serviceCenterTask.getSafeMode());
+ eventBus.post(failedTask);
+ Assert.assertEquals(1, count.value.intValue());
+ Assert.assertTrue(serviceCenterTask.getSafeMode());
+
+ eventBus.post(succeededTask);
+ eventBus.post(succeededTask);
+ eventBus.post(succeededTask);
+ Assert.assertTrue(serviceCenterTask.getSafeMode());
+ Assert.assertEquals(1, count.value.intValue());
+ eventBus.post(succeededTask);
+ Assert.assertFalse(serviceCenterTask.getSafeMode());
+ Assert.assertEquals(2, count.value.intValue());
+ eventBus.unregister(eventListener);
+ }
}