You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/11/25 03:05:12 UTC
[helix] branch master updated: refactor instanceService to clusterMaintenanceService (#1912)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ad985ac refactor instanceService to clusterMaintenanceService (#1912)
ad985ac is described below
commit ad985acc5f8a958f829c69efbfadc4c7398be75e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Nov 24 19:05:04 2021 -0800
refactor instanceService to clusterMaintenanceService (#1912)
---
.../clusterMaintenanceService/HealthCheck.java | 74 +++++
.../MaintenanceManagementService.java | 364 ++++++++++++++++++++-
.../server/resources/helix/InstancesAccessor.java | 10 +-
.../resources/helix/PerInstanceAccessor.java | 18 +-
.../helix/rest/server/service/InstanceService.java | 49 +--
.../rest/server/service/InstanceServiceImpl.java | 328 +------------------
.../TestMaintenanceManagementService.java} | 96 ++----
7 files changed, 498 insertions(+), 441 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/HealthCheck.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/HealthCheck.java
new file mode 100644
index 0000000..992fe76
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/HealthCheck.java
@@ -0,0 +1,74 @@
+package org.apache.helix.rest.clusterMaintenanceService;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+
+public enum HealthCheck {
+ /**
+ * Check if instance is alive
+ */
+ INSTANCE_NOT_ALIVE,
+ /**
+ * Check if instance is enabled both in instance config and cluster config
+ */
+ INSTANCE_NOT_ENABLED,
+ /**
+ * Check if instance is stable
+ * Stable means all the ideal state mapping matches external view (view of current state).
+ */
+ INSTANCE_NOT_STABLE,
+ /**
+ * Check if instance has 0 resource assigned
+ */
+ EMPTY_RESOURCE_ASSIGNMENT,
+ /**
+ * Check if instance has disabled partitions
+ */
+ HAS_DISABLED_PARTITION,
+ /**
+ * Check if instance has valid configuration (pre-requisite for all checks)
+ */
+ INVALID_CONFIG,
+ /**
+ * Check if instance has error partitions
+ */
+ HAS_ERROR_PARTITION,
+ /**
+ * Check if all resources hosted on the instance can still meet the min active replica
+ * constraint if this instance is shutdown
+ */
+ MIN_ACTIVE_REPLICA_CHECK_FAILED;
+
+ /**
+ * Pre-defined list of checks to test if an instance can be stopped at runtime
+ */
+ public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());
+ /**
+ * Pre-defined list of checks to test if an instance is in healthy running state
+ */
+ public static List<HealthCheck> STARTED_AND_HEALTH_CHECK_LIST = ImmutableList
+ .of(INVALID_CONFIG, INSTANCE_NOT_ALIVE, INSTANCE_NOT_ENABLED, INSTANCE_NOT_STABLE,
+ EMPTY_RESOURCE_ASSIGNMENT);
+}
\ No newline at end of file
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 0435dc2..605e8f6 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -21,13 +21,89 @@ package org.apache.helix.rest.clusterMaintenanceService;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.RESTConfig;
+import org.apache.helix.rest.client.CustomRestClient;
+import org.apache.helix.rest.client.CustomRestClientFactory;
+import org.apache.helix.rest.common.HelixDataAccessorWrapper;
+import org.apache.helix.rest.server.json.instance.InstanceInfo;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.service.InstanceService;
+import org.apache.helix.rest.server.service.InstanceServiceImpl;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-;
public class MaintenanceManagementService {
+ private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ExecutorService POOL = Executors.newCachedThreadPool();
+
+ // Metric names for custom instance check
+ private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_ERROR_TOTAL =
+ MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_error_total");
+ private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION =
+ MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_duration");
+
+ private final ConfigAccessor _configAccessor;
+ private final CustomRestClient _customRestClient;
+ private final String _namespace;
+ private final boolean _skipZKRead;
+ private final boolean _continueOnFailures;
+ private final HelixDataAccessorWrapper _dataAccessor;
+
+
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
+ boolean skipZKRead, String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, false, namespace);
+ }
+
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
+ boolean skipZKRead, boolean continueOnFailures, String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+ continueOnFailures, namespace);
+ }
+
+ @VisibleForTesting
+ MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
+ CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures,
+ String namespace) {
+ _dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace);
+ _configAccessor = configAccessor;
+ _customRestClient = customRestClient;
+ _skipZKRead = skipZKRead;
+ _continueOnFailures = continueOnFailures;
+ _namespace = namespace;
+ }
+
/**
* Perform health check and maintenance operation check and execution for a instance in
* one cluster.
@@ -121,4 +197,290 @@ public class MaintenanceManagementService {
throws IOException {
return null;
}
+
+ public InstanceInfo getInstanceHealthInfo(String clusterId, String instanceName,
+ List<HealthCheck> healthChecks) {
+ InstanceInfo.Builder instanceInfoBuilder = new InstanceInfo.Builder(instanceName);
+
+ InstanceConfig instanceConfig =
+ _dataAccessor.getProperty(_dataAccessor.keyBuilder().instanceConfig(instanceName));
+ LiveInstance liveInstance =
+ _dataAccessor.getProperty(_dataAccessor.keyBuilder().liveInstance(instanceName));
+ if (instanceConfig != null) {
+ instanceInfoBuilder.instanceConfig(instanceConfig.getRecord());
+ } else {
+ LOG.warn("Missing instance config for {}", instanceName);
+ }
+ if (liveInstance != null) {
+ instanceInfoBuilder.liveInstance(liveInstance.getRecord());
+ String sessionId = liveInstance.getEphemeralOwner();
+
+ List<String> resourceNames = _dataAccessor
+ .getChildNames(_dataAccessor.keyBuilder().currentStates(instanceName, sessionId));
+ instanceInfoBuilder.resources(resourceNames);
+ List<String> partitions = new ArrayList<>();
+ for (String resourceName : resourceNames) {
+ CurrentState currentState = _dataAccessor.getProperty(
+ _dataAccessor.keyBuilder().currentState(instanceName, sessionId, resourceName));
+ if (currentState != null && currentState.getPartitionStateMap() != null) {
+ partitions.addAll(currentState.getPartitionStateMap().keySet());
+ } else {
+ LOG.warn(
+ "Current state is either null or partitionStateMap is missing. InstanceName: {}, SessionId: {}, ResourceName: {}",
+ instanceName, sessionId, resourceName);
+ }
+ }
+ instanceInfoBuilder.partitions(partitions);
+ } else {
+ LOG.warn("Missing live instance for {}", instanceName);
+ }
+ try {
+ Map<String, Boolean> healthStatus =
+ getInstanceHealthStatus(clusterId, instanceName, healthChecks);
+ instanceInfoBuilder.healthStatus(healthStatus);
+ } catch (HelixException ex) {
+ LOG.error(
+ "Exception while getting health status. Cluster: {}, Instance: {}, reporting health status as unHealth",
+ clusterId, instanceName, ex);
+ instanceInfoBuilder.healthStatus(false);
+ }
+
+ return instanceInfoBuilder.build();
+ }
+
+ /**
+ * {@inheritDoc}
+ * Single instance stoppable check implementation is a special case of
+ * {@link #batchGetInstancesStoppableChecks(String, List, String)}
+ * <p>
+ * Step 1: Perform instance level Helix own health checks
+ * Step 2: Perform instance level client side health checks
+ * Step 3: Perform partition level (all partitions on the instance) client side health checks
+ * <p>
+ * Note: if the check fails at one step, the rest steps won't be executed because the instance
+ * cannot be stopped
+ */
+ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
+ String jsonContent) throws IOException {
+ return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
+ .get(instanceName);
+ }
+
+
+ public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
+ List<String> instances, String jsonContent) throws IOException {
+ Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
+ // helix instance check
+ List<String> instancesForCustomInstanceLevelChecks =
+ batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
+ // custom check
+ batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
+ finalStoppableChecks, getCustomPayLoads(jsonContent));
+ return finalStoppableChecks;
+ }
+
+ private List<String> batchHelixInstanceStoppableCheck(String clusterId,
+ Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
+ Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
+ .toMap(Function.identity(),
+ instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
+ // finalStoppableChecks only contains instances that does not pass this health check
+ return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
+ }
+
+ private void batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
+ Map<String, StoppableCheck> finalStoppableChecks, Map<String, String> customPayLoads) {
+ if (instances.isEmpty() ) {
+ // if all instances failed at previous checks, then all following checks are not required.
+ return;
+ }
+ RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
+ if (restConfig == null) {
+ String errorMessage = String.format(
+ "The cluster %s hasn't enabled client side health checks yet, "
+ + "thus the stoppable check result is inaccurate", clusterId);
+ LOG.error(errorMessage);
+ throw new HelixException(errorMessage);
+ }
+ Map<String, Future<StoppableCheck>> customInstanceLevelChecks = instances.stream().collect(
+ Collectors.toMap(Function.identity(), instance -> POOL.submit(
+ () -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance),
+ customPayLoads))));
+ List<String> instancesForCustomPartitionLevelChecks =
+ filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
+ if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
+ Map<String, StoppableCheck> instancePartitionLevelChecks =
+ performPartitionsCheck(instancesForCustomPartitionLevelChecks, restConfig,
+ customPayLoads);
+ for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
+ .entrySet()) {
+ String instance = instancePartitionStoppableCheckEntry.getKey();
+ StoppableCheck stoppableCheck = instancePartitionStoppableCheckEntry.getValue();
+ addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
+ }
+ }
+ }
+
+ private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, String instance,
+ StoppableCheck stoppableCheck) {
+ if (!stoppableChecks.containsKey(instance)) {
+ stoppableChecks.put(instance, stoppableCheck);
+ } else {
+ // Merge two checks
+ stoppableChecks.get(instance).add(stoppableCheck);
+ }
+ }
+
+ private List<String> filterInstancesForNextCheck(
+ Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
+ Map<String, StoppableCheck> finalStoppableCheckByInstance) {
+ List<String> instancesForNextCheck = new ArrayList<>();
+ for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance
+ .entrySet()) {
+ String instance = entry.getKey();
+ try {
+ StoppableCheck stoppableCheck = entry.getValue().get();
+ if (!stoppableCheck.isStoppable()) {
+ // put the check result of the failed-to-stop instances
+ addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
+ }
+ if (stoppableCheck.isStoppable() || _continueOnFailures){
+ // instance passed this around of check or mandatory all checks
+ // will be checked in the next round
+ instancesForNextCheck.add(instance);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to get StoppableChecks in parallel. Instance: {}", instance, e);
+ }
+ }
+
+ return instancesForNextCheck;
+ }
+
+ private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
+ LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
+ Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
+ HealthCheck.STOPPABLE_CHECK_LIST);
+
+ return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
+ }
+
+ private StoppableCheck performCustomInstanceCheck(String clusterId, String instanceName,
+ String baseUrl, Map<String, String> customPayLoads) {
+ LOG.info("Perform instance level client side health checks for {}/{}", clusterId, instanceName);
+ MetricRegistry metrics = SharedMetricRegistries.getOrCreate(_namespace);
+
+ // Total requests metric is included as an attribute(Count) in timers
+ try (final Timer.Context timer = metrics.timer(CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION)
+ .time()) {
+ Map<String, Boolean> instanceStoppableCheck =
+ _customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads);
+ return new StoppableCheck(instanceStoppableCheck,
+ StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
+ } catch (IOException ex) {
+ LOG.error("Custom client side instance level health check for {}/{} failed.", clusterId,
+ instanceName, ex);
+ metrics.counter(CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_ERROR_TOTAL).inc();
+ return new StoppableCheck(false, Collections.singletonList(instanceName),
+ StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
+ }
+ }
+
+ private Map<String, StoppableCheck> performPartitionsCheck(List<String> instances,
+ RESTConfig restConfig, Map<String, String> customPayLoads) {
+ Map<String, Map<String, Boolean>> allPartitionsHealthOnLiveInstance =
+ _dataAccessor.getAllPartitionsHealthOnLiveInstance(restConfig, customPayLoads, _skipZKRead);
+ List<ExternalView> externalViews =
+ _dataAccessor.getChildValues(_dataAccessor.keyBuilder().externalViews(), true);
+ Map<String, StoppableCheck> instanceStoppableChecks = new HashMap<>();
+ for (String instanceName : instances) {
+ Map<String, List<String>> unHealthyPartitions = InstanceValidationUtil
+ .perPartitionHealthCheck(externalViews, allPartitionsHealthOnLiveInstance, instanceName,
+ _dataAccessor);
+
+ List<String> unHealthyPartitionsList = new ArrayList<>();
+ for (String partitionName : unHealthyPartitions.keySet()) {
+ for (String reason : unHealthyPartitions.get(partitionName)) {
+ unHealthyPartitionsList.add(reason.toUpperCase() + ":" + partitionName);
+ }
+ }
+ StoppableCheck stoppableCheck = new StoppableCheck(unHealthyPartitionsList.isEmpty(),
+ unHealthyPartitionsList, StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+ instanceStoppableChecks.put(instanceName, stoppableCheck);
+ }
+
+ return instanceStoppableChecks;
+ }
+
+ private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException {
+ Map<String, String> result = new HashMap<>();
+ if (jsonContent == null) {
+ return result;
+ }
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
+ // parsing the inputs as string key value pairs
+ jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asText()));
+ return result;
+ }
+
+ @VisibleForTesting
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+ List<HealthCheck> healthChecks) {
+ Map<String, Boolean> healthStatus = new HashMap<>();
+ for (HealthCheck healthCheck : healthChecks) {
+ switch (healthCheck) {
+ case INVALID_CONFIG:
+ boolean validConfig;
+ try {
+ validConfig =
+ InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName);
+ } catch (HelixException e) {
+ validConfig = false;
+ LOG.warn("Cluster {} instance {} doesn't have valid config: {}", clusterId, instanceName,
+ e.getMessage());
+ }
+
+ // TODO: should add reason to request response
+ healthStatus.put(HealthCheck.INVALID_CONFIG.name(), validConfig);
+ if (!validConfig) {
+ // No need to do remaining health checks.
+ return healthStatus;
+ }
+ break;
+ case INSTANCE_NOT_ENABLED:
+ healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(),
+ InstanceValidationUtil.isEnabled(_dataAccessor, instanceName));
+ break;
+ case INSTANCE_NOT_ALIVE:
+ healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
+ InstanceValidationUtil.isAlive(_dataAccessor, instanceName));
+ break;
+ case INSTANCE_NOT_STABLE:
+ boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
+ healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
+ break;
+ case HAS_ERROR_PARTITION:
+ healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
+ !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
+ break;
+ case HAS_DISABLED_PARTITION:
+ healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
+ !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
+ break;
+ case EMPTY_RESOURCE_ASSIGNMENT:
+ healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
+ InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
+ break;
+ case MIN_ACTIVE_REPLICA_CHECK_FAILED:
+ healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
+ InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
+ break;
+ default:
+ LOG.error("Unsupported health check: {}", healthCheck);
+ break;
+ }
+ }
+
+ return healthStatus;
+ }
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 46e20a7..93537e3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -47,14 +47,13 @@ import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
-import org.apache.helix.rest.server.service.InstanceService;
-import org.apache.helix.rest.server.service.InstanceServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,8 +225,9 @@ public class InstancesAccessor extends AbstractHelixResource {
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
- InstanceService instanceService =
- new InstanceServiceImpl((ZKHelixDataAccessor) getDataAccssor(clusterId),
+
+ MaintenanceManagementService maintenanceService =
+ new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
getConfigAccessor(), skipZKRead, continueOnFailures, getNamespace());
ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
@@ -235,7 +235,7 @@ public class InstancesAccessor extends AbstractHelixResource {
case zone_based:
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, orderOfZone, clusterTopology.toZoneMapping());
- Map<String, StoppableCheck> instancesStoppableChecks = instanceService.batchGetInstancesStoppableChecks(
+ Map<String, StoppableCheck> instancesStoppableChecks = maintenanceService.batchGetInstancesStoppableChecks(
clusterId, zoneBasedInstance, customizedInput);
for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 02ed063..d42e8d9 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -56,11 +56,11 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.rest.clusterMaintenanceService.HealthCheck;
+import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
-import org.apache.helix.rest.server.service.InstanceService;
-import org.apache.helix.rest.server.service.InstanceServiceImpl;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
@@ -103,11 +103,11 @@ public class PerInstanceAccessor extends AbstractHelixResource {
case getInstance:
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
// TODO reduce GC by dependency injection
- InstanceService instanceService =
- new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(),
+ MaintenanceManagementService service =
+ new MaintenanceManagementService((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(),
Boolean.parseBoolean(skipZKRead), getNamespace());
- InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
- InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
+ InstanceInfo instanceInfo = service.getInstanceHealthInfo(clusterId, instanceName,
+ HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
String instanceInfoString;
try {
instanceInfoString = OBJECT_MAPPER.writeValueAsString(instanceInfo);
@@ -157,8 +157,8 @@ public class PerInstanceAccessor extends AbstractHelixResource {
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("continueOnFailures") boolean continueOnFailures) throws IOException {
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- InstanceService instanceService =
- new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(), skipZKRead,
+ MaintenanceManagementService maintenanceService =
+ new MaintenanceManagementService((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(), skipZKRead,
continueOnFailures, getNamespace());
StoppableCheck stoppableCheck;
try {
@@ -176,7 +176,7 @@ public class PerInstanceAccessor extends AbstractHelixResource {
}
stoppableCheck =
- instanceService.getInstanceStoppableCheck(clusterId, instanceName, customizedInput);
+ maintenanceService.getInstanceStoppableCheck(clusterId, instanceName, customizedInput);
} catch (HelixException e) {
LOG.error("Current cluster: {}, instance: {} has issue with health checks!", clusterId,
instanceName, e);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index ce8bc5e..ae11aec 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -20,62 +20,15 @@ package org.apache.helix.rest.server.service;
*/
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.ImmutableList;
+import org.apache.helix.rest.clusterMaintenanceService.HealthCheck;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
public interface InstanceService {
- enum HealthCheck {
- /**
- * Check if instance is alive
- */
- INSTANCE_NOT_ALIVE,
- /**
- * Check if instance is enabled both in instance config and cluster config
- */
- INSTANCE_NOT_ENABLED,
- /**
- * Check if instance is stable
- * Stable means all the ideal state mapping matches external view (view of current state).
- */
- INSTANCE_NOT_STABLE,
- /**
- * Check if instance has 0 resource assigned
- */
- EMPTY_RESOURCE_ASSIGNMENT,
- /**
- * Check if instance has disabled partitions
- */
- HAS_DISABLED_PARTITION,
- /**
- * Check if instance has valid configuration (pre-requisite for all checks)
- */
- INVALID_CONFIG,
- /**
- * Check if instance has error partitions
- */
- HAS_ERROR_PARTITION,
- /**
- * Check if all resources hosted on the instance can still meet the min active replica
- * constraint if this instance is shutdown
- */
- MIN_ACTIVE_REPLICA_CHECK_FAILED;
- /**
- * Pre-defined list of checks to test if an instance can be stopped at runtime
- */
- public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());
- /**
- * Pre-defined list of checks to test if an instance is in healthy running state
- */
- public static List<HealthCheck> STARTED_AND_HEALTH_CHECK_LIST = ImmutableList
- .of(INVALID_CONFIG, INSTANCE_NOT_ALIVE, INSTANCE_NOT_ENABLED, INSTANCE_NOT_STABLE,
- EMPTY_RESOURCE_ASSIGNMENT);
- }
/**
* Get the overall status of the instance
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index 644bb4a..59d1a37 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -20,62 +20,21 @@ package org.apache.helix.rest.server.service;
*/
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SharedMetricRegistries;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.RESTConfig;
-import org.apache.helix.rest.client.CustomRestClient;
-import org.apache.helix.rest.client.CustomRestClientFactory;
-import org.apache.helix.rest.common.HelixDataAccessorWrapper;
+import org.apache.helix.rest.clusterMaintenanceService.HealthCheck;
+import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService;
import org.apache.helix.rest.common.HelixRestNamespace;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
-import org.apache.helix.util.InstanceValidationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class InstanceServiceImpl implements InstanceService {
- private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final ExecutorService POOL = Executors.newCachedThreadPool();
-
- // Metric names for custom instance check
- private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_ERROR_TOTAL =
- MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_error_total");
- private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION =
- MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_duration");
- private final HelixDataAccessorWrapper _dataAccessor;
- private final ConfigAccessor _configAccessor;
- private final CustomRestClient _customRestClient;
- private final String _namespace;
- private final boolean _skipZKRead;
- private final boolean _continueOnFailures;
+public class InstanceServiceImpl implements InstanceService {
+ private MaintenanceManagementService _maintenanceManagementService;
@Deprecated
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor) {
@@ -90,77 +49,22 @@ public class InstanceServiceImpl implements InstanceService {
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
boolean skipZKRead, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, false, namespace);
+ this(dataAccessor, configAccessor, skipZKRead, false, namespace);
}
// TODO: too many params, convert to builder pattern
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
boolean skipZKRead, boolean continueOnFailures, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
- continueOnFailures, namespace);
+ _maintenanceManagementService =
+ new MaintenanceManagementService(dataAccessor, configAccessor, skipZKRead,
+ continueOnFailures, namespace);
}
- @VisibleForTesting
- InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures,
- String namespace) {
- _dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace);
- _configAccessor = configAccessor;
- _customRestClient = customRestClient;
- _skipZKRead = skipZKRead;
- _continueOnFailures = continueOnFailures;
- _namespace = namespace;
- }
@Override
public InstanceInfo getInstanceInfo(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
- InstanceInfo.Builder instanceInfoBuilder = new InstanceInfo.Builder(instanceName);
-
- InstanceConfig instanceConfig =
- _dataAccessor.getProperty(_dataAccessor.keyBuilder().instanceConfig(instanceName));
- LiveInstance liveInstance =
- _dataAccessor.getProperty(_dataAccessor.keyBuilder().liveInstance(instanceName));
- if (instanceConfig != null) {
- instanceInfoBuilder.instanceConfig(instanceConfig.getRecord());
- } else {
- LOG.warn("Missing instance config for {}", instanceName);
- }
- if (liveInstance != null) {
- instanceInfoBuilder.liveInstance(liveInstance.getRecord());
- String sessionId = liveInstance.getEphemeralOwner();
-
- List<String> resourceNames = _dataAccessor
- .getChildNames(_dataAccessor.keyBuilder().currentStates(instanceName, sessionId));
- instanceInfoBuilder.resources(resourceNames);
- List<String> partitions = new ArrayList<>();
- for (String resourceName : resourceNames) {
- CurrentState currentState = _dataAccessor.getProperty(
- _dataAccessor.keyBuilder().currentState(instanceName, sessionId, resourceName));
- if (currentState != null && currentState.getPartitionStateMap() != null) {
- partitions.addAll(currentState.getPartitionStateMap().keySet());
- } else {
- LOG.warn(
- "Current state is either null or partitionStateMap is missing. InstanceName: {}, SessionId: {}, ResourceName: {}",
- instanceName, sessionId, resourceName);
- }
- }
- instanceInfoBuilder.partitions(partitions);
- } else {
- LOG.warn("Missing live instance for {}", instanceName);
- }
- try {
- Map<String, Boolean> healthStatus =
- getInstanceHealthStatus(clusterId, instanceName, healthChecks);
- instanceInfoBuilder.healthStatus(healthStatus);
- } catch (HelixException ex) {
- LOG.error(
- "Exception while getting health status. Cluster: {}, Instance: {}, reporting health status as unHealth",
- clusterId, instanceName, ex);
- instanceInfoBuilder.healthStatus(false);
- }
-
- return instanceInfoBuilder.build();
+ return _maintenanceManagementService.getInstanceHealthInfo(clusterId, instanceName, healthChecks);
}
/**
@@ -185,218 +89,8 @@ public class InstanceServiceImpl implements InstanceService {
@Override
public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
- Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
- // helix instance check
- List<String> instancesForCustomInstanceLevelChecks =
- batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
- // custom check
- batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
- finalStoppableChecks, getCustomPayLoads(jsonContent));
- return finalStoppableChecks;
- }
-
- private List<String> batchHelixInstanceStoppableCheck(String clusterId,
- Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
- Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
- .toMap(Function.identity(),
- instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
- // finalStoppableChecks only contains instances that does not pass this health check
- return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
- }
-
- private void batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
- Map<String, StoppableCheck> finalStoppableChecks, Map<String, String> customPayLoads) {
- if (instances.isEmpty() ) {
- // if all instances failed at previous checks, then all following checks are not required.
- return;
- }
- RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
- if (restConfig == null) {
- String errorMessage = String.format(
- "The cluster %s hasn't enabled client side health checks yet, "
- + "thus the stoppable check result is inaccurate", clusterId);
- LOG.error(errorMessage);
- throw new HelixException(errorMessage);
- }
- Map<String, Future<StoppableCheck>> customInstanceLevelChecks = instances.stream().collect(
- Collectors.toMap(Function.identity(), instance -> POOL.submit(
- () -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance),
- customPayLoads))));
- List<String> instancesForCustomPartitionLevelChecks =
- filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
- if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
- Map<String, StoppableCheck> instancePartitionLevelChecks =
- performPartitionsCheck(instancesForCustomPartitionLevelChecks, restConfig,
- customPayLoads);
- for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
- .entrySet()) {
- String instance = instancePartitionStoppableCheckEntry.getKey();
- StoppableCheck stoppableCheck = instancePartitionStoppableCheckEntry.getValue();
- addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
- }
- }
- }
-
- private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, String instance,
- StoppableCheck stoppableCheck) {
- if (!stoppableChecks.containsKey(instance)) {
- stoppableChecks.put(instance, stoppableCheck);
- } else {
- // Merge two checks
- stoppableChecks.get(instance).add(stoppableCheck);
- }
- }
-
- private List<String> filterInstancesForNextCheck(
- Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
- Map<String, StoppableCheck> finalStoppableCheckByInstance) {
- List<String> instancesForNextCheck = new ArrayList<>();
- for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance
- .entrySet()) {
- String instance = entry.getKey();
- try {
- StoppableCheck stoppableCheck = entry.getValue().get();
- if (!stoppableCheck.isStoppable()) {
- // put the check result of the failed-to-stop instances
- addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
- }
- if (stoppableCheck.isStoppable() || _continueOnFailures){
- // instance passed this around of check or mandatory all checks
- // will be checked in the next round
- instancesForNextCheck.add(instance);
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to get StoppableChecks in parallel. Instance: {}", instance, e);
- }
- }
-
- return instancesForNextCheck;
+ return _maintenanceManagementService
+ .batchGetInstancesStoppableChecks(clusterId, instances, jsonContent);
}
- private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
- LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
- Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
- InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
-
- return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
- }
-
- private StoppableCheck performCustomInstanceCheck(String clusterId, String instanceName,
- String baseUrl, Map<String, String> customPayLoads) {
- LOG.info("Perform instance level client side health checks for {}/{}", clusterId, instanceName);
- MetricRegistry metrics = SharedMetricRegistries.getOrCreate(_namespace);
-
- // Total requests metric is included as an attribute(Count) in timers
- try (final Timer.Context timer = metrics.timer(CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION)
- .time()) {
- Map<String, Boolean> instanceStoppableCheck =
- _customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads);
- return new StoppableCheck(instanceStoppableCheck,
- StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
- } catch (IOException ex) {
- LOG.error("Custom client side instance level health check for {}/{} failed.", clusterId,
- instanceName, ex);
- metrics.counter(CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_ERROR_TOTAL).inc();
- return new StoppableCheck(false, Collections.singletonList(instanceName),
- StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
- }
- }
-
- private Map<String, StoppableCheck> performPartitionsCheck(List<String> instances,
- RESTConfig restConfig, Map<String, String> customPayLoads) {
- Map<String, Map<String, Boolean>> allPartitionsHealthOnLiveInstance =
- _dataAccessor.getAllPartitionsHealthOnLiveInstance(restConfig, customPayLoads, _skipZKRead);
- List<ExternalView> externalViews =
- _dataAccessor.getChildValues(_dataAccessor.keyBuilder().externalViews(), true);
- Map<String, StoppableCheck> instanceStoppableChecks = new HashMap<>();
- for (String instanceName : instances) {
- Map<String, List<String>> unHealthyPartitions = InstanceValidationUtil
- .perPartitionHealthCheck(externalViews, allPartitionsHealthOnLiveInstance, instanceName,
- _dataAccessor);
-
- List<String> unHealthyPartitionsList = new ArrayList<>();
- for (String partitionName : unHealthyPartitions.keySet()) {
- for (String reason : unHealthyPartitions.get(partitionName)) {
- unHealthyPartitionsList.add(reason.toUpperCase() + ":" + partitionName);
- }
- }
- StoppableCheck stoppableCheck = new StoppableCheck(unHealthyPartitionsList.isEmpty(),
- unHealthyPartitionsList, StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
- instanceStoppableChecks.put(instanceName, stoppableCheck);
- }
-
- return instanceStoppableChecks;
- }
-
- private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException {
- Map<String, String> result = new HashMap<>();
- if (jsonContent == null) {
- return result;
- }
- JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
- // parsing the inputs as string key value pairs
- jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asText()));
- return result;
- }
-
- @VisibleForTesting
- protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
- List<HealthCheck> healthChecks) {
- Map<String, Boolean> healthStatus = new HashMap<>();
- for (HealthCheck healthCheck : healthChecks) {
- switch (healthCheck) {
- case INVALID_CONFIG:
- boolean validConfig;
- try {
- validConfig =
- InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName);
- } catch (HelixException e) {
- validConfig = false;
- LOG.warn("Cluster {} instance {} doesn't have valid config: {}", clusterId, instanceName,
- e.getMessage());
- }
-
- // TODO: should add reason to request response
- healthStatus.put(HealthCheck.INVALID_CONFIG.name(), validConfig);
- if (!validConfig) {
- // No need to do remaining health checks.
- return healthStatus;
- }
- break;
- case INSTANCE_NOT_ENABLED:
- healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(),
- InstanceValidationUtil.isEnabled(_dataAccessor, instanceName));
- break;
- case INSTANCE_NOT_ALIVE:
- healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
- InstanceValidationUtil.isAlive(_dataAccessor, instanceName));
- break;
- case INSTANCE_NOT_STABLE:
- boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
- healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
- break;
- case HAS_ERROR_PARTITION:
- healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
- !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
- break;
- case HAS_DISABLED_PARTITION:
- healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
- !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
- break;
- case EMPTY_RESOURCE_ASSIGNMENT:
- healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
- InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
- break;
- case MIN_ACTIVE_REPLICA_CHECK_FAILED:
- healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
- InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
- break;
- default:
- LOG.error("Unsupported health check: {}", healthCheck);
- break;
- }
- }
-
- return healthStatus;
- }
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
similarity index 87%
rename from helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
rename to helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 54db45c..770aeb5 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.server.service;
+package org.apache.helix.rest.clusterMaintenanceService;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,14 +18,12 @@ package org.apache.helix.rest.server.service;
* specific language governing permissions and limitations
* under the License.
*/
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
@@ -48,7 +46,6 @@ import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
@@ -60,7 +57,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-public class TestInstanceService {
+
+public class TestMaintenanceManagementService {
private static final String TEST_CLUSTER = "TestCluster";
private static final String TEST_INSTANCE = "instance0.linkedin.com_1235";
@@ -70,7 +68,6 @@ public class TestInstanceService {
private ConfigAccessor _configAccessor;
@Mock
private CustomRestClient _customRestClient;
-
@BeforeMethod
public void beforeMethod() {
MockitoAnnotations.initMocks(this);
@@ -79,23 +76,35 @@ public class TestInstanceService {
when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
}
+ class MockMaintenanceManagementService extends MaintenanceManagementService {
+ public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
+ boolean continueOnFailures, String namespace) {
+ super(dataAccessor, configAccessor, customRestClient, skipZKRead, continueOnFailures,
+ namespace);
+ }
+ @Override
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+ List<HealthCheck> healthChecks) {
+ return Collections.emptyMap();
+ }
+ }
+
@Test
public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOException {
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
- InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
- HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient,
+ false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks) {
return failedCheck;
}
};
-
String jsonContent = "";
StoppableCheck actual =
service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
-
Assert.assertEquals(actual.getFailedChecks().size(), failedCheck.size());
Assert.assertFalse(actual.isStoppable());
verifyZeroInteractions(_customRestClient);
@@ -104,8 +113,8 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IOException {
- InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -116,11 +125,9 @@ public class TestInstanceService {
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
.thenReturn(failedCheck);
-
String jsonContent = "{\n" + " \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}";
StoppableCheck actual =
service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
-
Assert.assertFalse(actual.isStoppable());
Assert.assertEquals(actual.getFailedChecks().size(), failedCheck.size());
verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any());
@@ -129,8 +136,8 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckConnectionRefused() throws IOException {
- InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -138,31 +145,25 @@ public class TestInstanceService {
return Collections.emptyMap();
}
};
-
when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
.thenThrow(IOException.class);
-
Map<String, String> dataMap =
ImmutableMap.of("instance", TEST_INSTANCE, "selection_base", "zone_based");
String jsonContent = new ObjectMapper().writeValueAsString(dataMap);
StoppableCheck actual =
service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
-
int expectedFailedChecksSize = 1;
String expectedFailedCheck =
StoppableCheck.Category.CUSTOM_INSTANCE_CHECK.getPrefix() + TEST_INSTANCE;
-
Assert.assertNotNull(actual);
Assert.assertFalse(actual.isStoppable());
Assert.assertEquals(actual.getFailedChecks().size(), expectedFailedChecksSize);
Assert.assertEquals(actual.getFailedChecks().get(0), expectedFailedCheck);
}
-
@Test
public void testCustomPartitionCheckWithSkipZKRead() throws IOException {
// Let ZK result is health, but http request is unhealthy.
// We expect the check fail if we skipZKRead.
-
String testPartition = "PARTITION_0";
String siblingInstance = "instance0.linkedin.com_1236";
String jsonContent = "{\n" +
@@ -173,15 +174,12 @@ public class TestInstanceService {
ZKHelixDataAccessor zkHelixDataAccessor =
new ZKHelixDataAccessor(TEST_CLUSTER, mockAccessor);
ZNRecord successPartitionReport = new ZNRecord(HelixDataAccessorWrapper.PARTITION_HEALTH_KEY);
-
// Instance level check passed
when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap())).thenReturn(
Collections.singletonMap(TEST_INSTANCE, true));
-
// Mocks for partition level
when(mockAccessor.getChildNames(zkHelixDataAccessor.keyBuilder().liveInstances().getPath(), 2)).thenReturn(
Arrays.asList(TEST_INSTANCE, siblingInstance));
-
// Mock ZK record is healthy
Map<String, String> partition0 = new HashMap<>();
partition0.put(HelixDataAccessorWrapper.EXPIRY_KEY, String.valueOf(System.currentTimeMillis() + 100000L));
@@ -194,11 +192,9 @@ public class TestInstanceService {
.healthReport(siblingInstance, HelixDataAccessorWrapper.PARTITION_HEALTH_KEY)
.getPath()), Arrays.asList(new Stat(), new Stat()), 0, false)).thenReturn(
Arrays.asList(successPartitionReport, successPartitionReport));
-
// Mock client call result is check fail.
when(_customRestClient.getPartitionStoppableCheck(anyString(), anyList(), anyMap())).thenReturn(
Collections.singletonMap(testPartition, false));
-
// Mock data for InstanceValidationUtil
ExternalView externalView = new ExternalView("TestResource");
externalView.setState(testPartition, TEST_INSTANCE, "MASTER");
@@ -210,21 +206,20 @@ public class TestInstanceService {
AccessOption.PERSISTENT)).thenReturn(MasterSlaveSMD.build().getRecord());
// Valid data only from ZK, pass the check
- InstanceService instanceServiceReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, false,
- false);
+ MockMaintenanceManagementService instanceServiceReadZK =
+ new MockMaintenanceManagementService(zkHelixDataAccessor, _configAccessor, _customRestClient, false,
+ false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
StoppableCheck stoppableCheck =
instanceServiceReadZK.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
Assert.assertTrue(stoppableCheck.isStoppable());
// Even ZK data is valid. Skip ZK read should fail the test.
- InstanceService instanceServiceWithoutReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, true,
- false);
+ MockMaintenanceManagementService instanceServiceWithoutReadZK =
+ new MockMaintenanceManagementService(zkHelixDataAccessor, _configAccessor, _customRestClient, true,
+ false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
stoppableCheck = instanceServiceWithoutReadZK.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
Assert.assertFalse(stoppableCheck.isStoppable());
}
-
/*
* Tests stoppable check api when all checks query is enabled. After helix own check fails,
* the subsequent checks should be performed.
@@ -235,13 +230,12 @@ public class TestInstanceService {
BaseDataAccessor<ZNRecord> mockAccessor = mock(ZkBaseDataAccessor.class);
ZKHelixDataAccessor zkHelixDataAccessor =
new ZKHelixDataAccessor(TEST_CLUSTER, mockAccessor);
-
when(mockAccessor.getChildNames(zkHelixDataAccessor.keyBuilder().liveInstances().getPath(), 2))
.thenReturn(Arrays.asList(TEST_INSTANCE, siblingInstance));
Map<String, Boolean> instanceHealthFailedCheck = ImmutableMap.of("FailCheck", false);
- InstanceService service =
- new InstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, true, true,
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(zkHelixDataAccessor, _configAccessor, _customRestClient, true, true,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -249,31 +243,26 @@ public class TestInstanceService {
return instanceHealthFailedCheck;
}
};
-
when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
.thenReturn(ImmutableMap.of("FailCheck", false));
when(_customRestClient.getPartitionStoppableCheck(anyString(), anyList(), anyMap()))
.thenReturn(ImmutableMap.of("FailCheck", false));
-
StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, "");
List<String> expectedFailedChecks =
Arrays.asList("HELIX:FailCheck", "CUSTOM_INSTANCE_HEALTH_FAILURE:FailCheck");
-
Assert.assertEquals(actual.getFailedChecks(), expectedFailedChecks);
Assert.assertFalse(actual.isStoppable());
-
// Verify the subsequent checks are called
verify(_configAccessor, times(1)).getRESTConfig(anyString());
verify(_customRestClient, times(1)).getInstanceStoppableCheck(anyString(), anyMap());
verify(_customRestClient, times(2))
.getPartitionStoppableCheck(anyString(), nullable(List.class), anyMap());
}
-
// TODO re-enable the test when partition health checks get decoupled
@Test(enabled = false)
public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws IOException {
- InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -281,29 +270,14 @@ public class TestInstanceService {
return Collections.emptyMap();
}
};
-
// partition is health on the test instance but unhealthy on the sibling instance
when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
.thenReturn(Collections.emptyMap());
String jsonContent = "{\n" + " \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}";
StoppableCheck actual =
service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
-
Assert.assertFalse(actual.isStoppable());
verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any());
}
- private static class MockInstanceServiceImpl extends InstanceServiceImpl {
- MockInstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead, continueOnFailures,
- HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
- }
-
- @Override
- protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
- List<InstanceService.HealthCheck> healthChecks) {
- return Collections.emptyMap();
- }
- }
-}
+}
\ No newline at end of file