You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/07/25 21:43:33 UTC

[pinot] branch master updated: Instance retag validation check api (#11077)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d55a46047 Instance retag validation check api (#11077)
9d55a46047 is described below

commit 9d55a460470d76937b55ac8c4b814a6873e5239f
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Jul 26 03:13:27 2023 +0530

    Instance retag validation check api (#11077)
    
    * API to validate safety of retag operations
    
    * code cleanup and proper error handling
    
    * checkstyle fixes
    
    * handle realtime and offline server tags separately
    
    * get all table configs at once
    
    * logic refactor
    
    * fixes
    
    * test cases
    
    * broker tenant min requirement behaviour fix
    
    * Added docs and comments to improve readability
---
 .../pinot/common/utils/config/TagNameUtils.java    |   7 +
 .../api/resources/InstanceTagUpdateRequest.java    |  50 +++++++
 .../api/resources/OperationValidationResponse.java |  16 ++-
 .../resources/PinotInstanceRestletResource.java    | 149 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  36 ++++-
 .../api/PinotInstanceRestletResourceTest.java      | 125 +++++++++++++++--
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 7 files changed, 373 insertions(+), 14 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
index 15ed490dc8..28a2e30d47 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
@@ -102,6 +102,13 @@ public class TagNameUtils {
     return getTagForTenant(tenantName, REALTIME_SERVER_TAG_SUFFIX);
   }
 
+  /**
+   * Returns the server tag name for the given tenant and the given table type.
+   */
+  public static String getServerTagForTenant(@Nullable String tenantName, TableType type) {
+    return getTagForTenant(tenantName, String.format("_%s", type));
+  }
+
   private static String getTagForTenant(@Nullable String tenantName, String tagSuffix) {
     if (tenantName == null) {
       return DEFAULT_TENANT_NAME + tagSuffix;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java
new file mode 100644
index 0000000000..c320387a4d
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+
+
+@ApiModel
+public class InstanceTagUpdateRequest {
+  @JsonProperty("instanceName")
+  @ApiModelProperty(example = "Server_a.b.com_20000")
+  private String _instanceName;
+  @JsonProperty("newTags")
+  private List<String> _newTags;
+
+  public String getInstanceName() {
+    return _instanceName;
+  }
+
+  public void setInstanceName(String instanceName) {
+    _instanceName = instanceName;
+  }
+
+  public List<String> getNewTags() {
+    return _newTags;
+  }
+
+  public void setNewTags(List<String> newTags) {
+    _newTags = newTags;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
index 43897ccdc0..6b27e38e63 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
@@ -58,14 +58,24 @@ public class OperationValidationResponse {
     return this;
   }
 
+  public OperationValidationResponse putAllIssues(List<ErrorWrapper> issues) {
+    _issues.addAll(issues);
+    return this;
+  }
+
   public String getIssueMessage(int index) {
     return _issues.get(index).getMessage();
   }
 
   public static class ErrorWrapper {
+    @JsonProperty("code")
     ErrorCode _code;
+    @JsonProperty("message")
     String _message;
 
+    public ErrorWrapper() {
+    }
+
     public ErrorWrapper(ErrorCode code, String... args) {
       _code = code;
       _message = String.format(code._description, args);
@@ -82,7 +92,11 @@ public class OperationValidationResponse {
 
   public enum ErrorCode {
     IS_ALIVE("Instance %s is still live"),
-    CONTAINS_RESOURCE("Instance %s exists in ideal state for %s");
+    CONTAINS_RESOURCE("Instance %s exists in ideal state for %s"),
+    MINIMUM_INSTANCE_UNSATISFIED(
+        "Tenant '%s' will not satisfy minimum '%s' requirement if tag '%s' is removed from %s instance '%s'."),
+    ALREADY_DEFICIENT_TENANT("Tenant '%s' is low on '%s' instances by %s even after allocating instance %s"),
+    UNRECOGNISED_TAG_TYPE("The tag '%s' does not follow the suffix convention of either broker or server");
 
     public final String _description;
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index dacd4a5e51..06ca9ec92f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Sets;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
@@ -29,8 +30,12 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.ws.rs.ClientErrorException;
@@ -49,6 +54,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
 import org.apache.pinot.controller.api.exception.ControllerApplicationException;
@@ -458,4 +464,147 @@ public class PinotInstanceRestletResource {
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  /**
+   * Endpoint to validate the safety of instance tag update requests.
+   * This is to ensure that any instance tag update operation that user wants to perform is safe and does not create any
+   * side effect on the cluster and disturb the cluster consistency.
+   * This operation does not perform any changes to the cluster, but surfaces the possible issues which might occur upon
+   * applying the intended changes.
+   * @param requests list if instance tag update requests
+   * @return list of {@link OperationValidationResponse} which denotes the validity of each request along with listing
+   * the issues if any.
+   */
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> requests) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        requests.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+    Map<String, Integer> tagToInstanceCountMap = getUpdatedTagToInstanceCountMap(requests);
+    Map<String, Integer> tagDeficiency = computeTagDeficiency(tagToInstanceCountMap, tagMinServerMap);
+
+    Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(requests.size());
+    List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+    requests.forEach(request -> responseMap.put(request.getInstanceName(), new ArrayList<>()));
+    for (InstanceTagUpdateRequest request : requests) {
+      String name = request.getInstanceName();
+      Set<String> oldTags;
+      try {
+        oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+      } catch (NullPointerException exception) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+      }
+      Set<String> newTags = new HashSet<>(request.getNewTags());
+      // tags removed from instance
+      for (String tag : Sets.difference(oldTags, newTags)) {
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          String tenant = TagNameUtils.getTenantFromTag(tag);
+          String tagType = getInstanceTypeFromTag(tag);
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
+          tagDeficiency.put(tag, deficiency - 1);
+        }
+      }
+      // newly added tags to instance
+      for (String tag : newTags) {
+        String tagType = getInstanceTypeFromTag(tag);
+        if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+            || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
+          continue;
+        }
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),
+              tagType, deficiency.toString(), name));
+        }
+      }
+    }
+
+    // consolidate all the issues based on instances
+    List<OperationValidationResponse> response = new ArrayList<>(requests.size());
+    responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty()
+        ? new OperationValidationResponse().setInstanceName(instance).setSafe(true)
+        : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false)));
+    // separate entry to group all the deficient tenant issues as it's not related to any instance
+    if (!tenantIssues.isEmpty()) {
+      response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false));
+    }
+    return response;
+  }
+
+  private String getInstanceTypeFromTag(String tag) {
+    if (TagNameUtils.isServerTag(tag)) {
+      return "server";
+    } else if (TagNameUtils.isBrokerTag(tag)) {
+      return "broker";
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Compute the number of deficient instances for each tag.
+   * The utility accepts two maps
+   * - map of tags and count of their intended tagged instances
+   * - map of tags and their minimum number of instance requirements
+   * And then compares these two maps to return a map of tags and the number of their deficient instances.
+   *
+   * @param tagToInstanceCountMap tags and count of their intended tagged instances
+   * @param tagToMinInstanceCountMap tags and their minimum number of instance requirements
+   * @return tags and the number of their deficient instances
+   */
+  private Map<String, Integer> computeTagDeficiency(Map<String, Integer> tagToInstanceCountMap,
+      Map<String, Integer> tagToMinInstanceCountMap) {
+    Map<String, Integer> tagDeficiency = new HashMap<>();
+    Map<String, Integer> tagToInstanceCountMapCopy = new HashMap<>(tagToInstanceCountMap);
+    // compute deficiency for each of the minimum instance requirement entry
+    tagToMinInstanceCountMap.forEach((tag, minInstances) -> {
+      Integer updatedInstances = tagToInstanceCountMapCopy.remove(tag);
+      // if tag is not present in the provided map its considered as if tag is not assigned to any instance
+      // hence deficiency = minimum instance requirement.
+      tagDeficiency.put(tag, minInstances - (updatedInstances != null ? updatedInstances : 0));
+    });
+    // tags for which minimum instance requirement is not specified are assumed to have no deficiency (deficiency = 0)
+    tagToInstanceCountMapCopy.forEach((tag, updatedInstances) -> tagDeficiency.put(tag, 0));
+    return tagDeficiency;
+  }
+
+  /**
+   * Utility to fetch the existing tags and count of their respective tagged instances and then apply the changes based
+   * on the provided list of {@link InstanceTagUpdateRequest} to get the updated map of tags and count of their
+   * respective tagged instances
+   * @param requests list of {@link InstanceTagUpdateRequest}
+   * @return map of tags and updated count of their respective tagged instances
+   */
+  private Map<String, Integer> getUpdatedTagToInstanceCountMap(List<InstanceTagUpdateRequest> requests) {
+    Map<String, Integer> updatedTagInstanceMap = new HashMap<>();
+    Set<String> visitedInstances = new HashSet<>();
+    // build the map of tags and their respective instance counts from the given tag update request list
+    requests.forEach(instance -> {
+      instance.getNewTags().forEach(tag ->
+          updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
+      visitedInstances.add(instance.getInstanceName());
+    });
+    // add the instance counts to tags for the rest of the instances apart from the ones mentioned in requests
+    _pinotHelixResourceManager.getAllInstances().forEach(instance -> {
+      if (!visitedInstances.contains(instance)) {
+        _pinotHelixResourceManager.getTagsForInstance(instance).forEach(tag ->
+            updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
+        visitedInstances.add(instance);
+      }
+    });
+    return updatedTagInstanceMap;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index fdbca20433..346a19a1d6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1225,7 +1225,7 @@ public class PinotHelixResourceManager {
     return tenantSet;
   }
 
-  private List<String> getTagsForInstance(String instanceName) {
+  public List<String> getTagsForInstance(String instanceName) {
     InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
     return config.getTags();
   }
@@ -3023,6 +3023,15 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
   }
 
+  /**
+   * Get all table configs.
+   *
+   * @return List of table configs. Empty list in case of tables configs does not exist.
+   */
+  public List<TableConfig> getAllTableConfigs() {
+    return ZKMetadataProvider.getAllTableConfigs(_propertyStore);
+  }
+
   /**
    * Get the offline table config for the given table name.
    *
@@ -4128,6 +4137,31 @@ public class PinotHelixResourceManager {
     return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
   }
 
+  /**
+   * Construct a map of all the tags and their respective minimum instance requirements.
+   * The minimum instance requirement is computed by
+   * - for BROKER tenant tag set it to 1 if it hosts any table else set it to 0
+   * - for SERVER tenant tag iterate over all the tables of that tenant and find the maximum table replication.
+   * - for rest of the tags just set it to 0
+   * @return map of tags and their minimum instance requirements
+   */
+  public Map<String, Integer> minimumInstancesRequiredForTags() {
+    Map<String, Integer> tagMinInstanceMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : getAllHelixInstanceConfigs()) {
+      for (String tag : instanceConfig.getTags()) {
+        tagMinInstanceMap.put(tag, 0);
+      }
+    }
+    for (TableConfig tableConfig : getAllTableConfigs()) {
+      String tag = TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(),
+          tableConfig.getTableType());
+      tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication()));
+      String brokerTag = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
+      tagMinInstanceMap.put(brokerTag, 1);
+    }
+    return tagMinInstanceMap;
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 9b84c134eb..3bf0e2c70d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -19,24 +19,34 @@
 package org.apache.pinot.controller.api;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.pinot.controller.api.resources.InstanceTagUpdateRequest;
+import org.apache.pinot.controller.api.resources.OperationValidationResponse;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
@@ -47,22 +57,24 @@ import static org.testng.Assert.assertTrue;
  */
 public class PinotInstanceRestletResourceTest extends ControllerTest {
 
+  private ControllerRequestURLBuilder _urlBuilder = null;
+
   @BeforeClass
   public void setUp()
       throws Exception {
     DEFAULT_INSTANCE.setupSharedStateAndValidate();
+    _urlBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
   }
 
   @Test
   public void testInstanceListingAndCreation()
       throws Exception {
-    ControllerRequestURLBuilder requestURLBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
-    String listInstancesUrl = requestURLBuilder.forInstanceList();
+    String listInstancesUrl = _urlBuilder.forInstanceList();
     int expectedNumInstances = 1 + DEFAULT_NUM_BROKER_INSTANCES + DEFAULT_NUM_SERVER_INSTANCES;
     checkNumInstances(listInstancesUrl, expectedNumInstances);
 
     // Create untagged broker and server instances
-    String createInstanceUrl = requestURLBuilder.forInstanceCreate();
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
     Instance brokerInstance1 = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null, 0, 0, 0, 0, false);
     sendPostRequest(createInstanceUrl, brokerInstance1.toJsonString());
     Instance serverInstance1 =
@@ -110,14 +122,14 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
         new Instance("1.2.3.4", 1234, InstanceType.BROKER, Collections.singletonList(newBrokerTag), null, 0, 0, 0, 0,
             false);
     String brokerInstanceId = "Broker_1.2.3.4_1234";
-    String brokerInstanceUrl = requestURLBuilder.forInstance(brokerInstanceId);
+    String brokerInstanceUrl = _urlBuilder.forInstance(brokerInstanceId);
     sendPutRequest(brokerInstanceUrl, newBrokerInstance.toJsonString());
     String newServerTag = "new-server-tag";
     Instance newServerInstance =
         new Instance("1.2.3.4", 2345, InstanceType.SERVER, Collections.singletonList(newServerTag), null, 28090, 28091,
             28092, 28093, true);
     String serverInstanceId = "Server_1.2.3.4_2345";
-    String serverInstanceUrl = requestURLBuilder.forInstance(serverInstanceId);
+    String serverInstanceUrl = _urlBuilder.forInstance(serverInstanceId);
     sendPutRequest(serverInstanceUrl, newServerInstance.toJsonString());
 
     checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{newBrokerTag}, null, -1, -1, -1, -1, false);
@@ -126,9 +138,9 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
 
     // Test Instance updateTags API
     String brokerInstanceUpdateTagsUrl =
-        requestURLBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER"));
+        _urlBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER"));
     sendPutRequest(brokerInstanceUpdateTagsUrl);
-    String serverInstanceUpdateTagsUrl = requestURLBuilder.forInstanceUpdateTags(serverInstanceId,
+    String serverInstanceUpdateTagsUrl = _urlBuilder.forInstanceUpdateTags(serverInstanceId,
         Lists.newArrayList("tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME"));
     sendPutRequest(serverInstanceUpdateTagsUrl);
     checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{"tag_BROKER", "newTag_BROKER"}, null, -1, -1, -1,
@@ -137,10 +149,10 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
         new String[]{"tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME"}, null, 28090, 28091, 28092, 28093, true);
 
     // Test DELETE instance API
-    sendDeleteRequest(requestURLBuilder.forInstance("Broker_1.2.3.4_1234"));
-    sendDeleteRequest(requestURLBuilder.forInstance("Server_1.2.3.4_2345"));
-    sendDeleteRequest(requestURLBuilder.forInstance("Broker_2.3.4.5_1234"));
-    sendDeleteRequest(requestURLBuilder.forInstance("Server_2.3.4.5_2345"));
+    sendDeleteRequest(_urlBuilder.forInstance("Broker_1.2.3.4_1234"));
+    sendDeleteRequest(_urlBuilder.forInstance("Server_1.2.3.4_2345"));
+    sendDeleteRequest(_urlBuilder.forInstance("Broker_2.3.4.5_1234"));
+    sendDeleteRequest(_urlBuilder.forInstance("Server_2.3.4.5_2345"));
     checkNumInstances(listInstancesUrl, expectedNumInstances);
   }
 
@@ -163,7 +175,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
       boolean queriesDisabled)
       throws Exception {
     JsonNode response = JsonUtils.stringToJsonNode(
-        ControllerTest.sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forInstance(instanceName)));
+        ControllerTest.sendGetRequest(_urlBuilder.forInstance(instanceName)));
     assertEquals(response.get("instanceName").asText(), instanceName);
     assertEquals(response.get("hostName").asText(), hostName);
     assertTrue(response.get("enabled").asBoolean());
@@ -193,6 +205,95 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
     }
   }
 
+  @Test
+  public void instanceRetagHappyPathTest()
+      throws IOException {
+    Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap();
+    List<InstanceTagUpdateRequest> request = new ArrayList<>();
+    currentInstanceTagsMap.forEach((instance, tags) -> {
+      if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)
+          || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+        InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest();
+        payload.setInstanceName(instance);
+        payload.setNewTags(tags);
+        request.add(payload);
+      }
+    });
+    List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue(
+        sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)),
+        OperationValidationResponse[].class));
+    assertNotNull(response);
+    response.forEach(item -> assertTrue(item.isSafe()));
+  }
+
+  @Test
+  public void instanceRetagServerDeficiencyTest()
+      throws Exception {
+    String tableName = "testTable";
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
+        .setNumReplicas(2).build();
+    // create table with replication as 2 so that DefaultTenant has a minimum server requirement as 2.
+    DEFAULT_INSTANCE.addTableConfig(tableConfig);
+    Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap();
+    List<InstanceTagUpdateRequest> request = new ArrayList<>();
+    currentInstanceTagsMap.forEach((instance, tags) -> {
+      if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)
+          || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+        InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest();
+        payload.setInstanceName(instance);
+        payload.setNewTags(Lists.newArrayList());
+        request.add(payload);
+      }
+    });
+    List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue(
+        sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)),
+        OperationValidationResponse[].class));
+    assertNotNull(response);
+
+    int deficientServers = 2;
+    int deficientBrokers = 1;
+    for (OperationValidationResponse item : response) {
+      String instanceName = item.getInstanceName();
+      boolean validity = item.isSafe();
+      if (!validity) {
+        List<OperationValidationResponse.ErrorWrapper> issues = item.getIssues();
+        assertEquals(issues.size(), 1);
+        assertEquals(issues.get(0).getCode(), OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED);
+        if (instanceName.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+          deficientServers--;
+        } else if (instanceName.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+          deficientBrokers--;
+        }
+      }
+    }
+    assertEquals(deficientServers, 0);
+    assertEquals(deficientBrokers, 0);
+    DEFAULT_INSTANCE.dropOfflineTable(tableName);
+  }
+
+  private Map<String, List<String>> getCurrentInstanceTagsMap()
+      throws IOException {
+    String listInstancesUrl = _urlBuilder.forInstanceList();
+    JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl));
+    JsonNode instances = response.get("instances");
+    Map<String, List<String>> map = new HashMap<>(instances.size());
+    for (int i = 0; i < instances.size(); i++) {
+      String instance = instances.get(i).asText();
+      map.put(instance, getInstanceTags(instance));
+    }
+    return map;
+  }
+
+  private List<String> getInstanceTags(String instance)
+      throws IOException {
+    String getInstancesUrl = _urlBuilder.forInstance(instance);
+    List<String> tags = new ArrayList<>();
+    for (JsonNode tag : JsonUtils.stringToJsonNode(sendGetRequest(getInstancesUrl)).get("tags")) {
+      tags.add(tag.asText());
+    }
+    return tags;
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 394d11e47e..fe04bc3e19 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -542,6 +542,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseStatus");
   }
 
+  public String forUpdateTagsValidation() {
+    return String.format("%s/instances/updateTags/validate", _baseUrl);
+  }
+
   private static String encode(String s) {
     try {
       return URLEncoder.encode(s, "UTF-8");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org