You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/02/22 19:31:55 UTC
[helix] 04/04: Add rest endpoint for virtual topology group (#1958)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit de572a6192f4186cc1cabe8ee8ceb38a7d3dc615
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Feb 16 12:55:56 2022 -0500
Add rest endpoint for virtual topology group (#1958)
---
.../rest/server/resources/AbstractResource.java | 1 +
.../server/resources/helix/ClusterAccessor.java | 25 ++++
.../helix/rest/server/TestClusterAccessor.java | 156 ++++++++++++++++-----
3 files changed, 147 insertions(+), 35 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index a709d6a..b7d02a7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -59,6 +59,7 @@ public class AbstractResource {
public enum Command {
activate,
addInstanceTag,
+ addVirtualTopologyGroup,
expand,
enable,
disable,
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index cf2457c..daea45f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -70,6 +70,7 @@ import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
+import org.apache.helix.rest.server.service.VirtualTopologyGroupService;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -235,6 +236,21 @@ public class ClusterAccessor extends AbstractHelixResource {
}
break;
+ case addVirtualTopologyGroup:
+ try {
+ addVirtualTopologyGroup(clusterId, content);
+ } catch (JsonProcessingException ex) {
+ LOG.error("Failed to parse json string: {}", content, ex);
+ return badRequest("Invalid payload json body: " + content);
+ } catch (IllegalArgumentException ex) {
+ LOG.error("Illegal input {} for command {}.", content, command, ex);
+ return badRequest(String.format("Illegal input %s for command %s", content, command));
+ } catch (Exception ex) {
+ LOG.error("Failed to add virtual topology group to cluster {}", clusterId, ex);
+ return serverError(ex);
+ }
+ break;
+
case expand:
try {
clusterSetup.expandCluster(clusterId);
@@ -305,6 +321,15 @@ public class ClusterAccessor extends AbstractHelixResource {
return OK();
}
+ private void addVirtualTopologyGroup(String clusterId, String content) throws JsonProcessingException {
+ ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+ VirtualTopologyGroupService service = new VirtualTopologyGroupService(
+ getHelixAdmin(), clusterService, getConfigAccessor(), getDataAccssor(clusterId));
+ Map<String, String> customFieldsMap =
+ OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { });
+ service.addVirtualTopologyGroup(clusterId, customFieldsMap);
+ }
+
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@Timed(name = HttpConstants.READ_REQUEST)
@GET
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 3bb3c29..34cfbaf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server;
* under the License.
*/
+import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,6 +46,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.cloud.azure.AzureConstants;
import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -70,10 +72,13 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestClusterAccessor extends AbstractTestClass {
+ private static final String VG_CLUSTER = "vgCluster";
+
@BeforeClass
public void beforeClass() {
for (String cluster : _clusters) {
@@ -167,10 +172,7 @@ public class TestClusterAccessor extends AbstractTestClass {
updateClusterConfigFromRest(cluster, configDelta, Command.update);
//get valid cluster topology map
- String topologyMapDef = get(topologyMapUrlBase, null, Response.Status.OK.getStatusCode(), true);
- Map<String, Object> topologyMap =
- OBJECT_MAPPER.readValue(topologyMapDef, new TypeReference<HashMap<String, Object>>() {
- });
+ Map<String, Object> topologyMap = getMapResponseFromRest(topologyMapUrlBase);
Assert.assertEquals(topologyMap.size(), 2);
Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List);
List<String> instances = (List<String>) topologyMap.get("/helixZoneId:zone0");
@@ -197,10 +199,7 @@ public class TestClusterAccessor extends AbstractTestClass {
updateClusterConfigFromRest(cluster, configDelta, Command.update);
//get valid cluster fault zone map
- String faultZoneMapDef = get(faultZoneUrlBase, null, Response.Status.OK.getStatusCode(), true);
- Map<String, Object> faultZoneMap =
- OBJECT_MAPPER.readValue(faultZoneMapDef, new TypeReference<HashMap<String, Object>>() {
- });
+ Map<String, Object> faultZoneMap = getMapResponseFromRest(faultZoneUrlBase);
Assert.assertEquals(faultZoneMap.size(), 2);
Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List);
instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0");
@@ -223,6 +222,108 @@ public class TestClusterAccessor extends AbstractTestClass {
"/instance:TestCluster_1localhost_12927"))));
}
+ @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters")
+ public void testAddVirtualTopologyGroup(String requestParam, int numGroups,
+ Map<String, String> instanceToGroup) throws IOException {
+ post("clusters/" + VG_CLUSTER,
+ ImmutableMap.of("command", "addVirtualTopologyGroup"),
+ Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+ Map<String, Object> topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER));
+ Assert.assertTrue(topology.containsKey("zones"));
+ Assert.assertEquals(((List) topology.get("zones")).size(), numGroups);
+
+ ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER);
+ String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname";
+ Assert.assertEquals(clusterConfig.getTopology(), expectedTopology);
+ Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);
+
+ HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor);
+ for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) {
+ InstanceConfig instanceConfig =
+ helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey()));
+ String expectedGroup = entry.getValue();
+ Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE),
+ expectedGroup);
+ }
+ }
+
+ @Test(dependsOnMethods = "testGetClusters")
+ public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException {
+ setupClusterForVirtualTopology(VG_CLUSTER);
+ String requestParam = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\","
+ + "\"autoMaintenanceModeDisabled\":\"true\"}";
+ // expect failure as cluster is not in maintenance mode while autoMaintenanceModeDisabled=true
+ post("clusters/" + VG_CLUSTER,
+ ImmutableMap.of("command", "addVirtualTopologyGroup"),
+ Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+ // enable maintenance mode and expect success
+ post("clusters/" + VG_CLUSTER,
+ ImmutableMap.of("command", "enableMaintenanceMode"),
+ Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+ post("clusters/" + VG_CLUSTER,
+ ImmutableMap.of("command", "addVirtualTopologyGroup"),
+ Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+
+ Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER));
+ }
+
+ private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException {
+ String body =
+ get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
+ return OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
+ }
+
+ @DataProvider
+ public Object[][] prepareVirtualTopologyTests() {
+ setupClusterForVirtualTopology(VG_CLUSTER);
+ String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
+ String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
+ return new Object[][] {
+ {test1, 7, ImmutableMap.of(
+ "vgCluster_localhost_12918", "vgTest_0",
+ "vgCluster_localhost_12919", "vgTest_0",
+ "vgCluster_localhost_12925", "vgTest_4",
+ "vgCluster_localhost_12927", "vgTest_6")},
+ {test2, 9, ImmutableMap.of(
+ "vgCluster_localhost_12918", "vgTest_0",
+ "vgCluster_localhost_12919", "vgTest_0",
+ "vgCluster_localhost_12925", "vgTest_6",
+ "vgCluster_localhost_12927", "vgTest_8")},
+ // repeat test1 for deterministic and test for decreasing numGroups
+ {test1, 7, ImmutableMap.of(
+ "vgCluster_localhost_12918", "vgTest_0",
+ "vgCluster_localhost_12919", "vgTest_0",
+ "vgCluster_localhost_12925", "vgTest_4",
+ "vgCluster_localhost_12927", "vgTest_6")}
+ };
+ }
+
+ private void setupClusterForVirtualTopology(String clusterName) {
+ HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ ZNRecord record = new ZNRecord("testZnode");
+ record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), true);
+ record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), "TestCloudID");
+ record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), CloudProvider.AZURE.name());
+ CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
+ _gSetupTool.addCluster(clusterName, true, cloudConfig);
+
+ Set<String> instances = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String instanceName = clusterName + "_localhost_" + (12918 + i);
+ _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+ InstanceConfig instanceConfig =
+ helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+ instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + instanceName);
+ helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName), instanceConfig);
+ instances.add(instanceName);
+ }
+ startInstances(clusterName, instances, 10);
+ }
+
@Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap")
public void testAddConfigFields() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
@@ -399,19 +500,11 @@ public class TestClusterAccessor extends AbstractTestClass {
Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
// verify is in maintenance mode
- String body =
- get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
- JsonNode node = OBJECT_MAPPER.readTree(body);
- boolean maintenance =
- node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
- Assert.assertTrue(maintenance);
+ Assert.assertTrue(isMaintenanceModeEnabled(cluster));
// Check that we could retrieve maintenance signal correctly
- String signal = get("clusters/" + cluster + "/controller/maintenanceSignal", null,
- Response.Status.OK.getStatusCode(), true);
Map<String, Object> maintenanceSignalMap =
- OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, Object>>() {
- });
+ getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal");
Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason);
Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
@@ -422,10 +515,7 @@ public class TestClusterAccessor extends AbstractTestClass {
Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
// verify no longer in maintenance mode
- body = get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
- node = OBJECT_MAPPER.readTree(body);
- Assert.assertFalse(
- node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue());
+ Assert.assertFalse(isMaintenanceModeEnabled(cluster));
get("clusters/" + cluster + "/controller/maintenanceSignal", null,
Response.Status.NOT_FOUND.getStatusCode(), false);
@@ -448,11 +538,8 @@ public class TestClusterAccessor extends AbstractTestClass {
Assert.assertNotNull(leader, "Leader name cannot be null!");
// Get the controller leadership history JSON's last entry
- String leadershipHistory = get("clusters/" + cluster + "/controller/history", null,
- Response.Status.OK.getStatusCode(), true);
- Map<String, Object> leadershipHistoryMap =
- OBJECT_MAPPER.readValue(leadershipHistory, new TypeReference<HashMap<String, Object>>() {
- });
+ Map<String, Object> leadershipHistoryMap = getMapResponseFromRest("clusters/" + cluster + "/controller/history");
+
Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be null!");
Object leadershipHistoryList =
leadershipHistoryMap.get(AbstractResource.Properties.history.name());
@@ -477,11 +564,8 @@ public class TestClusterAccessor extends AbstractTestClass {
Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
// Get the maintenance history JSON's last entry
- String maintenanceHistory = get("clusters/" + cluster + "/controller/maintenanceHistory", null,
- Response.Status.OK.getStatusCode(), true);
Map<String, Object> maintenanceHistoryMap =
- OBJECT_MAPPER.readValue(maintenanceHistory, new TypeReference<HashMap<String, Object>>() {
- });
+ getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceHistory");
Object maintenanceHistoryList =
maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name());
Assert.assertNotNull(maintenanceHistoryList);
@@ -571,10 +655,7 @@ public class TestClusterAccessor extends AbstractTestClass {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String cluster = "TestCluster_1";
String urlBase = "clusters/TestCluster_1/statemodeldefs/";
- String stateModelDefs =
- get(urlBase, null, Response.Status.OK.getStatusCode(), true);
- Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new TypeReference<HashMap<String, Object>>() {
- });
+ Map<String, Object> defMap = getMapResponseFromRest(urlBase);
Assert.assertTrue(defMap.size() == 2);
Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List);
@@ -1427,4 +1508,9 @@ public class TestClusterAccessor extends AbstractTestClass {
Assert.assertEquals(auditLog.getResponseCode(), statusCode);
Assert.assertEquals(auditLog.getResponseEntity(), responseEntity);
}
+
+ private Map<String, Object> getMapResponseFromRest(String uri) throws JsonProcessingException {
+ String response = get(uri, null, Response.Status.OK.getStatusCode(), true);
+ return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, Object>>() { });
+ }
}