You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2021/04/15 02:52:53 UTC
[helix] branch master updated: Add option to continue checks on
failures for stoppable api (#1689)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 35b5ec1 Add option to continue checks on failures for stoppable api (#1689)
35b5ec1 is described below
commit 35b5ec15c6fa0839a4f1e34c34ca798f151d3f15
Author: Huizhi Lu <51...@users.noreply.github.com>
AuthorDate: Wed Apr 14 19:49:54 2021 -0700
Add option to continue checks on failures for stoppable api (#1689)
This commit provides an option in Helix stoppable check API, such that when the option is used, Helix will always perform all checks and return all failed checks. Query param: continueOnFailures.
---
.../rest/server/json/instance/StoppableCheck.java | 5 ++
.../server/resources/helix/InstancesAccessor.java | 15 +++--
.../resources/helix/PerInstanceAccessor.java | 38 ++++++++----
.../rest/server/service/InstanceServiceImpl.java | 43 ++++++++++----
.../helix/rest/server/TestInstancesAccessor.java | 2 -
.../helix/rest/server/TestPerInstanceAccessor.java | 9 ++-
.../rest/server/service/TestInstanceService.java | 68 ++++++++++++++++++----
7 files changed, 138 insertions(+), 42 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index ee554c1..150e87b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -78,4 +78,9 @@ public class StoppableCheck {
public List<String> getFailedChecks() {
return failedChecks;
}
+
+ public void add(StoppableCheck other) {
+ failedChecks.addAll(other.getFailedChecks());
+ isStoppable = failedChecks.isEmpty();
+ }
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index bacae0d..233cc54 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -148,8 +148,12 @@ public class InstancesAccessor extends AbstractHelixResource {
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
- public Response instancesOperations(@PathParam("clusterId") String clusterId,
- @QueryParam("skipZKRead") String skipZKRead, @QueryParam("command") String command, String content) {
+ public Response instancesOperations(
+ @PathParam("clusterId") String clusterId,
+ @QueryParam("command") String command,
+ @QueryParam("continueOnFailures") boolean continueOnFailures,
+ @QueryParam("skipZKRead") boolean skipZKRead,
+ String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
@@ -177,7 +181,7 @@ public class InstancesAccessor extends AbstractHelixResource {
admin.enableInstance(clusterId, enableInstances, false);
break;
case stoppable:
- return batchGetStoppableInstances(clusterId, node, Boolean.valueOf(skipZKRead));
+ return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
@@ -193,7 +197,8 @@ public class InstancesAccessor extends AbstractHelixResource {
return OK();
}
- private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead) throws IOException {
+ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
+ boolean continueOnFailures) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
@@ -223,7 +228,7 @@ public class InstancesAccessor extends AbstractHelixResource {
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
InstanceService instanceService =
new InstanceServiceImpl((ZKHelixDataAccessor) getDataAccssor(clusterId),
- getConfigAccessor(), skipZKRead, getNamespace());
+ getConfigAccessor(), skipZKRead, continueOnFailures, getNamespace());
ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
switch (selectionBase) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 1560d74..d588f54 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -39,7 +39,6 @@ import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -101,7 +100,6 @@ public class PerInstanceAccessor extends AbstractHelixResource {
switch (cmd) {
case getInstance:
- ObjectMapper objectMapper = new ObjectMapper();
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
// TODO reduce GC by dependency injection
InstanceService instanceService =
@@ -111,7 +109,7 @@ public class PerInstanceAccessor extends AbstractHelixResource {
InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
String instanceInfoString;
try {
- instanceInfoString = objectMapper.writeValueAsString(instanceInfo);
+ instanceInfoString = OBJECT_MAPPER.writeValueAsString(instanceInfo);
} catch (JsonProcessingException e) {
return serverError(e);
}
@@ -133,28 +131,44 @@ public class PerInstanceAccessor extends AbstractHelixResource {
}
}
+ /**
+ * Performs health checks for an instance to answer if it is stoppable.
+ *
+ * @param jsonContent json payload
+ * @param clusterId cluster id
+ * @param instanceName Instance name to be checked
+ * @param skipZKRead skip reading from zk server
+ * @param continueOnFailures whether or not continue to perform the subsequent checks if previous
+ * check fails. If false, when helix own check fails, the subsequent
+ * custom checks will not be performed.
+ * @return json response representing if queried instance is stoppable
+ * @throws IOException if there is any IO/network error
+ */
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
@Path("stoppable")
@Consumes(MediaType.APPLICATION_JSON)
- public Response isInstanceStoppable(String jsonContent, @PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName, @QueryParam("skipZKRead") String skipZKRead) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
+ public Response isInstanceStoppable(
+ String jsonContent,
+ @PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName,
+ @QueryParam("skipZKRead") boolean skipZKRead,
+ @QueryParam("continueOnFailures") boolean continueOnFailures) throws IOException {
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
InstanceService instanceService =
- new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(),
- Boolean.parseBoolean(skipZKRead), getNamespace());
- StoppableCheck stoppableCheck = null;
+ new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor, getConfigAccessor(), skipZKRead,
+ continueOnFailures, getNamespace());
+ StoppableCheck stoppableCheck;
try {
stoppableCheck =
instanceService.getInstanceStoppableCheck(clusterId, instanceName, jsonContent);
} catch (HelixException e) {
- LOG.error(String.format("Current cluster %s has issue with health checks!", clusterId),
- e);
+ LOG.error("Current cluster: {}, instance: {} has issue with health checks!", clusterId,
+ instanceName, e);
return serverError(e);
}
- return OK(objectMapper.writeValueAsString(stoppableCheck));
+ return OK(OBJECT_MAPPER.writeValueAsString(stoppableCheck));
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index de280b0..6cc5592 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -72,8 +72,9 @@ public class InstanceServiceImpl implements InstanceService {
private final HelixDataAccessorWrapper _dataAccessor;
private final ConfigAccessor _configAccessor;
private final CustomRestClient _customRestClient;
- private String _namespace;
- private boolean _skipZKRead;
+ private final String _namespace;
+ private final boolean _skipZKRead;
+ private final boolean _continueOnFailures;
@Deprecated
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor) {
@@ -88,16 +89,25 @@ public class InstanceServiceImpl implements InstanceService {
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
boolean skipZKRead, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, namespace);
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, false, namespace);
+ }
+
+ // TODO: too many params, convert to builder pattern
+ public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
+ boolean skipZKRead, boolean continueOnFailures, String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+ continueOnFailures, namespace);
}
@VisibleForTesting
InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, String namespace) {
+ CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures,
+ String namespace) {
_dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace);
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
+ _continueOnFailures = continueOnFailures;
_namespace = namespace;
}
@@ -181,7 +191,7 @@ public class InstanceServiceImpl implements InstanceService {
List<String> instancesForCustomInstanceLevelChecks =
filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
if (instancesForCustomInstanceLevelChecks.isEmpty()) {
- // if all instances failed at helix custom level checks
+ // if all instances failed at helix custom level checks and all checks are not required
return finalStoppableChecks;
}
@@ -206,14 +216,25 @@ public class InstanceServiceImpl implements InstanceService {
instancesForCustomPartitionLevelChecks, restConfig, customPayLoads);
for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
.entrySet()) {
- finalStoppableChecks.put(instancePartitionStoppableCheckEntry.getKey(),
- instancePartitionStoppableCheckEntry.getValue());
+ String instance = instancePartitionStoppableCheckEntry.getKey();
+ StoppableCheck stoppableCheck = instancePartitionStoppableCheckEntry.getValue();
+ addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
}
}
return finalStoppableChecks;
}
+ private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, String instance,
+ StoppableCheck stoppableCheck) {
+ if (!stoppableChecks.containsKey(instance)) {
+ stoppableChecks.put(instance, stoppableCheck);
+ } else {
+ // Merge two checks
+ stoppableChecks.get(instance).add(stoppableCheck);
+ }
+ }
+
private List<String> filterInstancesForNextCheck(
Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
Map<String, StoppableCheck> finalStoppableCheckByInstance) {
@@ -225,9 +246,11 @@ public class InstanceServiceImpl implements InstanceService {
StoppableCheck stoppableCheck = entry.getValue().get();
if (!stoppableCheck.isStoppable()) {
// put the check result of the failed-to-stop instances
- finalStoppableCheckByInstance.put(instance, stoppableCheck);
- } else {
- // instance passed this around of check will be checked in the next round
+ addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
+ }
+ if (stoppableCheck.isStoppable() || _continueOnFailures){
+ // instance passed this around of check or mandatory all checks
+ // will be checked in the next round
instancesForNextCheck.add(instance);
}
} catch (InterruptedException | ExecutionException e) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 761ec26..912774b 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -31,7 +31,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.TestHelper;
@@ -45,7 +44,6 @@ import org.testng.annotations.Test;
public class TestInstancesAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_0";
- private ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Test
public void testInstancesStoppable_zoneBased() throws IOException {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 8809e1c..ecc83dc 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -62,8 +62,13 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
.format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
String stoppableCheckResult = response.readEntity(String.class);
- Assert.assertEquals(stoppableCheckResult,
- "{\"stoppable\":false,\"failedChecks\":[\"HELIX:EMPTY_RESOURCE_ASSIGNMENT\",\"HELIX:INSTANCE_NOT_ENABLED\",\"HELIX:INSTANCE_NOT_STABLE\"]}");
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
+ List<String> failedChecks = Arrays.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
+ "HELIX:INSTANCE_NOT_ENABLED", "HELIX:INSTANCE_NOT_STABLE");
+ Map<String, Object> expectedMap =
+ ImmutableMap.of("stoppable", false, "failedChecks", failedChecks);
+ Assert.assertEquals(actualMap, expectedMap);
System.out.println("End test :" + TestHelper.getTestMethodName());
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
index ad10e82..bb62b13 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
@@ -83,7 +83,7 @@ public class TestInstanceService {
public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOException {
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -105,7 +105,7 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -130,7 +130,7 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckConnectionRefused() throws IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -211,23 +211,69 @@ public class TestInstanceService {
// Valid data only from ZK, pass the check
InstanceService instanceServiceReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, false);
+ new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, false,
+ false);
StoppableCheck stoppableCheck =
instanceServiceReadZK.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
Assert.assertTrue(stoppableCheck.isStoppable());
// Even ZK data is valid. Skip ZK read should fail the test.
InstanceService instanceServiceWithoutReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, true);
+ new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, true,
+ false);
stoppableCheck = instanceServiceWithoutReadZK.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
Assert.assertFalse(stoppableCheck.isStoppable());
}
+ /*
+ * Tests stoppable check api when all checks query is enabled. After helix own check fails,
+ * the subsequent checks should be performed.
+ */
+ @Test
+ public void testGetStoppableWithAllChecks() throws IOException {
+ String siblingInstance = "instance0.linkedin.com_1236";
+ BaseDataAccessor<ZNRecord> mockAccessor = mock(ZkBaseDataAccessor.class);
+ ZKHelixDataAccessor zkHelixDataAccessor =
+ new ZKHelixDataAccessor(TEST_CLUSTER, InstanceType.ADMINISTRATOR, mockAccessor);
+
+ when(mockAccessor.getChildNames(zkHelixDataAccessor.keyBuilder().liveInstances().getPath(), 2))
+ .thenReturn(Arrays.asList(TEST_INSTANCE, siblingInstance));
+
+ Map<String, Boolean> instanceHealthFailedCheck = ImmutableMap.of("FailCheck", false);
+ InstanceService service =
+ new InstanceServiceImpl(zkHelixDataAccessor, _configAccessor, _customRestClient, true, true,
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ @Override
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
+ String instanceName, List<HealthCheck> healthChecks) {
+ return instanceHealthFailedCheck;
+ }
+ };
+
+ when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
+ .thenReturn(ImmutableMap.of("FailCheck", false));
+ when(_customRestClient.getPartitionStoppableCheck(anyString(), anyList(), anyMap()))
+ .thenReturn(ImmutableMap.of("FailCheck", false));
+
+ StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, "");
+ List<String> expectedFailedChecks =
+ Arrays.asList("HELIX:FailCheck", "CUSTOM_INSTANCE_HEALTH_FAILURE:FailCheck");
+
+ Assert.assertEquals(actual.getFailedChecks(), expectedFailedChecks);
+ Assert.assertFalse(actual.isStoppable());
+
+ // Verify the subsequent checks are called
+ verify(_configAccessor, times(1)).getRESTConfig(anyString());
+ verify(_customRestClient, times(1)).getInstanceStoppableCheck(anyString(), anyMap());
+ verify(_customRestClient, times(2))
+ .getPartitionStoppableCheck(anyString(), anyList(), anyMap());
+ }
+
// TODO re-enable the test when partition health checks get decoupled
@Test(enabled = false)
public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
@@ -247,16 +293,16 @@ public class TestInstanceService {
verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any());
}
- class MockInstanceServiceImpl extends InstanceServiceImpl {
+ private static class MockInstanceServiceImpl extends InstanceServiceImpl {
MockInstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead,
+ CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures) {
+ super(dataAccessor, configAccessor, customRestClient, skipZKRead, continueOnFailures,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
}
@Override
- protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
- String instanceName, List<InstanceService.HealthCheck> healthChecks) {
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+ List<InstanceService.HealthCheck> healthChecks) {
return Collections.emptyMap();
}
}