You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "snleee (via GitHub)" <gi...@apache.org> on 2023/07/21 08:59:25 UTC

[GitHub] [pinot] snleee commented on a diff in pull request #11077: Instance retag validation check api

snleee commented on code in PR #11077:
URL: https://github.com/apache/pinot/pull/11077#discussion_r1270389415


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        instances.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+    Map<String, Integer> tagDeficiency = computeTagDeficiency(instances, tagMinServerMap);
+
+    Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(instances.size());
+    List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+    instances.forEach(instance -> responseMap.put(instance.getInstanceName(), new ArrayList<>()));
+    for (InstanceTagUpdateRequest instance : instances) {
+      String name = instance.getInstanceName();
+      Set<String> oldTags;
+      try {
+        oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+      } catch (NullPointerException exception) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+      }
+      Set<String> newTags = new HashSet<>(instance.getNewTags());
+      // tags removed from instance
+      for (String tag : Sets.difference(oldTags, newTags)) {

Review Comment:
   Does this mean that we only support update the entire tag list `List<String> tags` and not support to add an extra tag to the existing tag list?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {

Review Comment:
   It would be great if we can provide the documentation on what are we validating here.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        instances.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+    Map<String, Integer> tagDeficiency = computeTagDeficiency(instances, tagMinServerMap);
+
+    Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(instances.size());
+    List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+    instances.forEach(instance -> responseMap.put(instance.getInstanceName(), new ArrayList<>()));
+    for (InstanceTagUpdateRequest instance : instances) {
+      String name = instance.getInstanceName();
+      Set<String> oldTags;
+      try {
+        oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+      } catch (NullPointerException exception) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+      }
+      Set<String> newTags = new HashSet<>(instance.getNewTags());
+      // tags removed from instance
+      for (String tag : Sets.difference(oldTags, newTags)) {
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          String tenant = TagNameUtils.getTenantFromTag(tag);
+          String tagType = getInstanceTypeFromTag(tag);
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
+          tagDeficiency.put(tag, deficiency - 1);
+        }
+      }
+      // newly added tags to instance
+      for (String tag : newTags) {
+        String tagType = getInstanceTypeFromTag(tag);
+        if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+            || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
+          continue;
+        }
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),
+              tagType, deficiency.toString(), name));
+        }
+      }
+    }
+
+    // consolidate all the issues based on instances
+    List<OperationValidationResponse> response = new ArrayList<>(instances.size());
+    responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty()
+        ? new OperationValidationResponse().setInstanceName(instance).setSafe(true)
+        : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false)));
+    // separate entry to group all the deficient tenant issues as it's not related to any instance
+    if (!tenantIssues.isEmpty()) {
+      response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false));
+    }
+    return response;
+  }
+
+  private String getInstanceTypeFromTag(String tag) {
+    if (TagNameUtils.isServerTag(tag)) {
+      return "server";
+    } else if (TagNameUtils.isBrokerTag(tag)) {
+      return "broker";
+    } else {
+      return null;
+    }
+  }
+
+  private Map<String, Integer> computeTagDeficiency(List<InstanceTagUpdateRequest> instances,

Review Comment:
   `instances` -> `requests`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {

Review Comment:
   `instances` -> `requests`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        instances.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();

Review Comment:
   Why don't we compute minInstances only for tags that show up in the request?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        instances.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+    Map<String, Integer> tagDeficiency = computeTagDeficiency(instances, tagMinServerMap);
+
+    Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(instances.size());
+    List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+    instances.forEach(instance -> responseMap.put(instance.getInstanceName(), new ArrayList<>()));
+    for (InstanceTagUpdateRequest instance : instances) {
+      String name = instance.getInstanceName();
+      Set<String> oldTags;
+      try {
+        oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+      } catch (NullPointerException exception) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+      }
+      Set<String> newTags = new HashSet<>(instance.getNewTags());
+      // tags removed from instance
+      for (String tag : Sets.difference(oldTags, newTags)) {
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          String tenant = TagNameUtils.getTenantFromTag(tag);
+          String tagType = getInstanceTypeFromTag(tag);
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
+          tagDeficiency.put(tag, deficiency - 1);
+        }
+      }
+      // newly added tags to instance
+      for (String tag : newTags) {
+        String tagType = getInstanceTypeFromTag(tag);
+        if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+            || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
+          continue;
+        }
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),
+              tagType, deficiency.toString(), name));
+        }
+      }
+    }
+
+    // consolidate all the issues based on instances
+    List<OperationValidationResponse> response = new ArrayList<>(instances.size());
+    responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty()
+        ? new OperationValidationResponse().setInstanceName(instance).setSafe(true)
+        : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false)));
+    // separate entry to group all the deficient tenant issues as it's not related to any instance
+    if (!tenantIssues.isEmpty()) {
+      response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false));
+    }
+    return response;
+  }
+
+  private String getInstanceTypeFromTag(String tag) {
+    if (TagNameUtils.isServerTag(tag)) {
+      return "server";
+    } else if (TagNameUtils.isBrokerTag(tag)) {
+      return "broker";
+    } else {
+      return null;
+    }
+  }
+
+  private Map<String, Integer> computeTagDeficiency(List<InstanceTagUpdateRequest> instances,

Review Comment:
   `computeTagDeficiency()` and `getUpdatedTagInstanceMap()` are hard to follow the logic due to a lot of stream is being used. Can you add the documentation on what we are trying to compute here? Also, can we try to review the logic and see if we can simplify some parts (if possible)?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java:
##########
@@ -416,4 +422,112 @@ public List<OperationValidationResponse> instanceDropSafetyCheck(
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/instances/updateTags/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> instances) {
+    LOGGER.info("Performing safety check on tag update request received for instances: {}",
+        instances.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+    Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+    Map<String, Integer> tagDeficiency = computeTagDeficiency(instances, tagMinServerMap);
+
+    Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(instances.size());
+    List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+    instances.forEach(instance -> responseMap.put(instance.getInstanceName(), new ArrayList<>()));
+    for (InstanceTagUpdateRequest instance : instances) {
+      String name = instance.getInstanceName();
+      Set<String> oldTags;
+      try {
+        oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+      } catch (NullPointerException exception) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+      }
+      Set<String> newTags = new HashSet<>(instance.getNewTags());
+      // tags removed from instance
+      for (String tag : Sets.difference(oldTags, newTags)) {
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          String tenant = TagNameUtils.getTenantFromTag(tag);
+          String tagType = getInstanceTypeFromTag(tag);
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
+          tagDeficiency.put(tag, deficiency - 1);
+        }
+      }
+      // newly added tags to instance
+      for (String tag : newTags) {
+        String tagType = getInstanceTypeFromTag(tag);
+        if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+            || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
+          responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
+          continue;
+        }
+        Integer deficiency = tagDeficiency.get(tag);
+        if (deficiency != null && deficiency > 0) {
+          tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
+              OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),

Review Comment:
   What's the difference between `ALREADY_DEFICIENT_TENANT` vs `MINIMUM_INSTANCE_UNSATISFIED`? 
   
   Aren't they essentially the same error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org