You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zh...@apache.org on 2022/02/16 17:56:10 UTC

[helix] branch helix-virtual-group updated: Add rest endpoint for virtual topology group (#1958)

This is an automated email from the ASF dual-hosted git repository.

zhangmeng pushed a commit to branch helix-virtual-group
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-virtual-group by this push:
     new 35ca4cd  Add rest endpoint for virtual topology group (#1958)
35ca4cd is described below

commit 35ca4cd97574ce48fd8ad52801dd8d64065ee791
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Feb 16 12:55:56 2022 -0500

    Add rest endpoint for virtual topology group (#1958)
---
 .../rest/server/resources/AbstractResource.java    |   1 +
 .../server/resources/helix/ClusterAccessor.java    |  25 ++++
 .../helix/rest/server/TestClusterAccessor.java     | 156 ++++++++++++++++-----
 3 files changed, 147 insertions(+), 35 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index a709d6a..b7d02a7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -59,6 +59,7 @@ public class AbstractResource {
   public enum Command {
     activate,
     addInstanceTag,
+    addVirtualTopologyGroup,
     expand,
     enable,
     disable,
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index cf2457c..daea45f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -70,6 +70,7 @@ import org.apache.helix.rest.common.HttpConstants;
 import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.service.ClusterService;
 import org.apache.helix.rest.server.service.ClusterServiceImpl;
+import org.apache.helix.rest.server.service.VirtualTopologyGroupService;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -235,6 +236,21 @@ public class ClusterAccessor extends AbstractHelixResource {
         }
         break;
 
+      case addVirtualTopologyGroup:
+        try {
+          addVirtualTopologyGroup(clusterId, content);
+        } catch (JsonProcessingException ex) {
+          LOG.error("Failed to parse json string: {}", content, ex);
+          return badRequest("Invalid payload json body: " + content);
+        } catch (IllegalArgumentException ex) {
+          LOG.error("Illegal input {} for command {}.", content, command, ex);
+          return badRequest(String.format("Illegal input %s for command %s", content, command));
+        } catch (Exception ex) {
+          LOG.error("Failed to add virtual topology group to cluster {}", clusterId, ex);
+          return serverError(ex);
+        }
+        break;
+
       case expand:
         try {
           clusterSetup.expandCluster(clusterId);
@@ -305,6 +321,15 @@ public class ClusterAccessor extends AbstractHelixResource {
     return OK();
   }
 
+  private void addVirtualTopologyGroup(String clusterId, String content) throws JsonProcessingException {
+    ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+    VirtualTopologyGroupService service = new VirtualTopologyGroupService(
+        getHelixAdmin(), clusterService, getConfigAccessor(), getDataAccssor(clusterId));
+    Map<String, String> customFieldsMap =
+        OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { });
+    service.addVirtualTopologyGroup(clusterId, customFieldsMap);
+  }
+
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
   @Timed(name = HttpConstants.READ_REQUEST)
   @GET
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 3bb3c29..34cfbaf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -70,10 +72,13 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestClusterAccessor extends AbstractTestClass {
 
+  private static final String VG_CLUSTER = "vgCluster";
+
   @BeforeClass
   public void beforeClass() {
     for (String cluster : _clusters) {
@@ -167,10 +172,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster topology map
-    String topologyMapDef = get(topologyMapUrlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> topologyMap =
-        OBJECT_MAPPER.readValue(topologyMapDef, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> topologyMap = getMapResponseFromRest(topologyMapUrlBase);
     Assert.assertEquals(topologyMap.size(), 2);
     Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List);
     List<String> instances = (List<String>) topologyMap.get("/helixZoneId:zone0");
@@ -197,10 +199,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster fault zone map
-    String faultZoneMapDef = get(faultZoneUrlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> faultZoneMap =
-        OBJECT_MAPPER.readValue(faultZoneMapDef, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> faultZoneMap = getMapResponseFromRest(faultZoneUrlBase);
     Assert.assertEquals(faultZoneMap.size(), 2);
     Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List);
     instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0");
@@ -223,6 +222,108 @@ public class TestClusterAccessor extends AbstractTestClass {
             "/instance:TestCluster_1localhost_12927"))));
   }
 
+  @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters")
+  public void testAddVirtualTopologyGroup(String requestParam, int numGroups,
+      Map<String, String> instanceToGroup) throws IOException {
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    Map<String, Object> topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER));
+    Assert.assertTrue(topology.containsKey("zones"));
+    Assert.assertEquals(((List) topology.get("zones")).size(), numGroups);
+
+    ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER);
+    String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname";
+    Assert.assertEquals(clusterConfig.getTopology(), expectedTopology);
+    Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);
+
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor);
+    for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) {
+      InstanceConfig instanceConfig =
+          helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey()));
+      String expectedGroup = entry.getValue();
+      Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE),
+          expectedGroup);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetClusters")
+  public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String requestParam = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\","
+        + "\"autoMaintenanceModeDisabled\":\"true\"}";
+    // expect failure as cluster is not in maintenance mode while autoMaintenanceModeDisabled=true
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+    // enable maintenance mode and expect success
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "enableMaintenanceMode"),
+        Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER));
+  }
+
+  private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException {
+    String body =
+        get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
+    return OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
+  }
+
+  @DataProvider
+  public Object[][] prepareVirtualTopologyTests() {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    return new Object[][] {
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")},
+        {test2, 9, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_6",
+            "vgCluster_localhost_12927", "vgTest_8")},
+        // repeat test1 for deterministic and test for decreasing numGroups
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")}
+    };
+  }
+
+  private void setupClusterForVirtualTopology(String clusterName) {
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    ZNRecord record = new ZNRecord("testZnode");
+    record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), true);
+    record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), "TestCloudID");
+    record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), CloudProvider.AZURE.name());
+    CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
+    _gSetupTool.addCluster(clusterName, true, cloudConfig);
+
+    Set<String> instances = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      String instanceName = clusterName + "_localhost_" + (12918 + i);
+      _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+      InstanceConfig instanceConfig =
+          helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+      instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + instanceName);
+      helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName), instanceConfig);
+      instances.add(instanceName);
+    }
+    startInstances(clusterName, instances, 10);
+  }
+
   @Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap")
   public void testAddConfigFields() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
@@ -399,19 +500,11 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // verify is in maintenance mode
-    String body =
-        get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
-    JsonNode node = OBJECT_MAPPER.readTree(body);
-    boolean maintenance =
-        node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
-    Assert.assertTrue(maintenance);
+    Assert.assertTrue(isMaintenanceModeEnabled(cluster));
 
     // Check that we could retrieve maintenance signal correctly
-    String signal = get("clusters/" + cluster + "/controller/maintenanceSignal", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceSignalMap =
-        OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal");
     Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
     Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason);
     Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
@@ -422,10 +515,7 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // verify no longer in maintenance mode
-    body = get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
-    node = OBJECT_MAPPER.readTree(body);
-    Assert.assertFalse(
-        node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue());
+    Assert.assertFalse(isMaintenanceModeEnabled(cluster));
 
     get("clusters/" + cluster + "/controller/maintenanceSignal", null,
         Response.Status.NOT_FOUND.getStatusCode(), false);
@@ -448,11 +538,8 @@ public class TestClusterAccessor extends AbstractTestClass {
     Assert.assertNotNull(leader, "Leader name cannot be null!");
 
     // Get the controller leadership history JSON's last entry
-    String leadershipHistory = get("clusters/" + cluster + "/controller/history", null,
-        Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> leadershipHistoryMap =
-        OBJECT_MAPPER.readValue(leadershipHistory, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> leadershipHistoryMap = getMapResponseFromRest("clusters/" + cluster + "/controller/history");
+
     Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be null!");
     Object leadershipHistoryList =
         leadershipHistoryMap.get(AbstractResource.Properties.history.name());
@@ -477,11 +564,8 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // Get the maintenance history JSON's last entry
-    String maintenanceHistory = get("clusters/" + cluster + "/controller/maintenanceHistory", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceHistoryMap =
-        OBJECT_MAPPER.readValue(maintenanceHistory, new TypeReference<HashMap<String, Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceHistory");
     Object maintenanceHistoryList =
         maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name());
     Assert.assertNotNull(maintenanceHistoryList);
@@ -571,10 +655,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String cluster = "TestCluster_1";
     String urlBase = "clusters/TestCluster_1/statemodeldefs/";
-    String stateModelDefs =
-        get(urlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new TypeReference<HashMap<String, Object>>() {
-    });
+    Map<String, Object> defMap = getMapResponseFromRest(urlBase);
 
     Assert.assertTrue(defMap.size() == 2);
     Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List);
@@ -1427,4 +1508,9 @@ public class TestClusterAccessor extends AbstractTestClass {
     Assert.assertEquals(auditLog.getResponseCode(), statusCode);
     Assert.assertEquals(auditLog.getResponseEntity(), responseEntity);
   }
+
+  private Map<String, Object> getMapResponseFromRest(String uri) throws JsonProcessingException {
+    String response = get(uri, null, Response.Status.OK.getStatusCode(), true);
+    return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, Object>>() { });
+  }
 }