You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/07/25 21:43:33 UTC
[pinot] branch master updated: Instance retag validation check api (#11077)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9d55a46047 Instance retag validation check api (#11077)
9d55a46047 is described below
commit 9d55a460470d76937b55ac8c4b814a6873e5239f
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Jul 26 03:13:27 2023 +0530
Instance retag validation check api (#11077)
* API to validate safety of retag operations
* code cleanup and proper error handling
* checkstyle fixes
* handle realtime and offline server tags separately
* get all table configs at once
* logic refactor
* fixes
* test cases
* broker tenant min requirement behaviour fix
* Added docs and comments to improve readability
---
.../pinot/common/utils/config/TagNameUtils.java | 7 +
.../api/resources/InstanceTagUpdateRequest.java | 50 +++++++
.../api/resources/OperationValidationResponse.java | 16 ++-
.../resources/PinotInstanceRestletResource.java | 149 +++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 36 ++++-
.../api/PinotInstanceRestletResourceTest.java | 125 +++++++++++++++--
.../utils/builder/ControllerRequestURLBuilder.java | 4 +
7 files changed, 373 insertions(+), 14 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
index 15ed490dc8..28a2e30d47 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java
@@ -102,6 +102,13 @@ public class TagNameUtils {
return getTagForTenant(tenantName, REALTIME_SERVER_TAG_SUFFIX);
}
+ /**
+ * Returns the server tag name for the given tenant and the given table type.
+ */
+ public static String getServerTagForTenant(@Nullable String tenantName, TableType type) {
+ return getTagForTenant(tenantName, String.format("_%s", type));
+ }
+
private static String getTagForTenant(@Nullable String tenantName, String tagSuffix) {
if (tenantName == null) {
return DEFAULT_TENANT_NAME + tagSuffix;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java
new file mode 100644
index 0000000000..c320387a4d
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+
+
+@ApiModel
+public class InstanceTagUpdateRequest {
+ @JsonProperty("instanceName")
+ @ApiModelProperty(example = "Server_a.b.com_20000")
+ private String _instanceName;
+ @JsonProperty("newTags")
+ private List<String> _newTags;
+
+ public String getInstanceName() {
+ return _instanceName;
+ }
+
+ public void setInstanceName(String instanceName) {
+ _instanceName = instanceName;
+ }
+
+ public List<String> getNewTags() {
+ return _newTags;
+ }
+
+ public void setNewTags(List<String> newTags) {
+ _newTags = newTags;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
index 43897ccdc0..6b27e38e63 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java
@@ -58,14 +58,24 @@ public class OperationValidationResponse {
return this;
}
+ public OperationValidationResponse putAllIssues(List<ErrorWrapper> issues) {
+ _issues.addAll(issues);
+ return this;
+ }
+
public String getIssueMessage(int index) {
return _issues.get(index).getMessage();
}
public static class ErrorWrapper {
+ @JsonProperty("code")
ErrorCode _code;
+ @JsonProperty("message")
String _message;
+ public ErrorWrapper() {
+ }
+
public ErrorWrapper(ErrorCode code, String... args) {
_code = code;
_message = String.format(code._description, args);
@@ -82,7 +92,11 @@ public class OperationValidationResponse {
public enum ErrorCode {
IS_ALIVE("Instance %s is still live"),
- CONTAINS_RESOURCE("Instance %s exists in ideal state for %s");
+ CONTAINS_RESOURCE("Instance %s exists in ideal state for %s"),
+ MINIMUM_INSTANCE_UNSATISFIED(
+ "Tenant '%s' will not satisfy minimum '%s' requirement if tag '%s' is removed from %s instance '%s'."),
+ ALREADY_DEFICIENT_TENANT("Tenant '%s' is low on '%s' instances by %s even after allocating instance %s"),
+ UNRECOGNISED_TAG_TYPE("The tag '%s' does not follow the suffix convention of either broker or server");
public final String _description;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index dacd4a5e51..06ca9ec92f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Sets;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
@@ -29,8 +30,12 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.ClientErrorException;
@@ -49,6 +54,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
@@ -458,4 +464,147 @@ public class PinotInstanceRestletResource {
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ /**
+ * Endpoint to validate the safety of instance tag update requests.
+ * This is to ensure that any instance tag update operation that user wants to perform is safe and does not create any
+ * side effect on the cluster and disturb the cluster consistency.
+ * This operation does not perform any changes to the cluster, but surfaces the possible issues which might occur upon
+ * applying the intended changes.
+ * @param requests list if instance tag update requests
+ * @return list of {@link OperationValidationResponse} which denotes the validity of each request along with listing
+ * the issues if any.
+ */
+ @POST
+ @Path("/instances/updateTags/validate")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
+ public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> requests) {
+ LOGGER.info("Performing safety check on tag update request received for instances: {}",
+ requests.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList()));
+ Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags();
+ Map<String, Integer> tagToInstanceCountMap = getUpdatedTagToInstanceCountMap(requests);
+ Map<String, Integer> tagDeficiency = computeTagDeficiency(tagToInstanceCountMap, tagMinServerMap);
+
+ Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(requests.size());
+ List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>();
+ requests.forEach(request -> responseMap.put(request.getInstanceName(), new ArrayList<>()));
+ for (InstanceTagUpdateRequest request : requests) {
+ String name = request.getInstanceName();
+ Set<String> oldTags;
+ try {
+ oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name));
+ } catch (NullPointerException exception) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED);
+ }
+ Set<String> newTags = new HashSet<>(request.getNewTags());
+ // tags removed from instance
+ for (String tag : Sets.difference(oldTags, newTags)) {
+ Integer deficiency = tagDeficiency.get(tag);
+ if (deficiency != null && deficiency > 0) {
+ String tenant = TagNameUtils.getTenantFromTag(tag);
+ String tagType = getInstanceTypeFromTag(tag);
+ responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+ OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name));
+ tagDeficiency.put(tag, deficiency - 1);
+ }
+ }
+ // newly added tags to instance
+ for (String tag : newTags) {
+ String tagType = getInstanceTypeFromTag(tag);
+ if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+ || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) {
+ responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper(
+ OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag));
+ continue;
+ }
+ Integer deficiency = tagDeficiency.get(tag);
+ if (deficiency != null && deficiency > 0) {
+ tenantIssues.add(new OperationValidationResponse.ErrorWrapper(
+ OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag),
+ tagType, deficiency.toString(), name));
+ }
+ }
+ }
+
+ // consolidate all the issues based on instances
+ List<OperationValidationResponse> response = new ArrayList<>(requests.size());
+ responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty()
+ ? new OperationValidationResponse().setInstanceName(instance).setSafe(true)
+ : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false)));
+ // separate entry to group all the deficient tenant issues as it's not related to any instance
+ if (!tenantIssues.isEmpty()) {
+ response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false));
+ }
+ return response;
+ }
+
+ private String getInstanceTypeFromTag(String tag) {
+ if (TagNameUtils.isServerTag(tag)) {
+ return "server";
+ } else if (TagNameUtils.isBrokerTag(tag)) {
+ return "broker";
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Compute the number of deficient instances for each tag.
+ * The utility accepts two maps
+ * - map of tags and count of their intended tagged instances
+ * - map of tags and their minimum number of instance requirements
+ * And then compares these two maps to return a map of tags and the number of their deficient instances.
+ *
+ * @param tagToInstanceCountMap tags and count of their intended tagged instances
+ * @param tagToMinInstanceCountMap tags and their minimum number of instance requirements
+ * @return tags and the number of their deficient instances
+ */
+ private Map<String, Integer> computeTagDeficiency(Map<String, Integer> tagToInstanceCountMap,
+ Map<String, Integer> tagToMinInstanceCountMap) {
+ Map<String, Integer> tagDeficiency = new HashMap<>();
+ Map<String, Integer> tagToInstanceCountMapCopy = new HashMap<>(tagToInstanceCountMap);
+ // compute deficiency for each of the minimum instance requirement entry
+ tagToMinInstanceCountMap.forEach((tag, minInstances) -> {
+ Integer updatedInstances = tagToInstanceCountMapCopy.remove(tag);
+ // if tag is not present in the provided map its considered as if tag is not assigned to any instance
+ // hence deficiency = minimum instance requirement.
+ tagDeficiency.put(tag, minInstances - (updatedInstances != null ? updatedInstances : 0));
+ });
+ // tags for which minimum instance requirement is not specified are assumed to have no deficiency (deficiency = 0)
+ tagToInstanceCountMapCopy.forEach((tag, updatedInstances) -> tagDeficiency.put(tag, 0));
+ return tagDeficiency;
+ }
+
+ /**
+ * Utility to fetch the existing tags and count of their respective tagged instances and then apply the changes based
+ * on the provided list of {@link InstanceTagUpdateRequest} to get the updated map of tags and count of their
+ * respective tagged instances
+ * @param requests list of {@link InstanceTagUpdateRequest}
+ * @return map of tags and updated count of their respective tagged instances
+ */
+ private Map<String, Integer> getUpdatedTagToInstanceCountMap(List<InstanceTagUpdateRequest> requests) {
+ Map<String, Integer> updatedTagInstanceMap = new HashMap<>();
+ Set<String> visitedInstances = new HashSet<>();
+ // build the map of tags and their respective instance counts from the given tag update request list
+ requests.forEach(instance -> {
+ instance.getNewTags().forEach(tag ->
+ updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
+ visitedInstances.add(instance.getInstanceName());
+ });
+ // add the instance counts to tags for the rest of the instances apart from the ones mentioned in requests
+ _pinotHelixResourceManager.getAllInstances().forEach(instance -> {
+ if (!visitedInstances.contains(instance)) {
+ _pinotHelixResourceManager.getTagsForInstance(instance).forEach(tag ->
+ updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1));
+ visitedInstances.add(instance);
+ }
+ });
+ return updatedTagInstanceMap;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index fdbca20433..346a19a1d6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1225,7 +1225,7 @@ public class PinotHelixResourceManager {
return tenantSet;
}
- private List<String> getTagsForInstance(String instanceName) {
+ public List<String> getTagsForInstance(String instanceName) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
return config.getTags();
}
@@ -3023,6 +3023,15 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
}
+ /**
+ * Get all table configs.
+ *
+ * @return List of table configs. Empty list in case of tables configs does not exist.
+ */
+ public List<TableConfig> getAllTableConfigs() {
+ return ZKMetadataProvider.getAllTableConfigs(_propertyStore);
+ }
+
/**
* Get the offline table config for the given table name.
*
@@ -4128,6 +4137,31 @@ public class PinotHelixResourceManager {
return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
}
+ /**
+ * Construct a map of all the tags and their respective minimum instance requirements.
+ * The minimum instance requirement is computed by
+ * - for BROKER tenant tag set it to 1 if it hosts any table else set it to 0
+ * - for SERVER tenant tag iterate over all the tables of that tenant and find the maximum table replication.
+ * - for rest of the tags just set it to 0
+ * @return map of tags and their minimum instance requirements
+ */
+ public Map<String, Integer> minimumInstancesRequiredForTags() {
+ Map<String, Integer> tagMinInstanceMap = new HashMap<>();
+ for (InstanceConfig instanceConfig : getAllHelixInstanceConfigs()) {
+ for (String tag : instanceConfig.getTags()) {
+ tagMinInstanceMap.put(tag, 0);
+ }
+ }
+ for (TableConfig tableConfig : getAllTableConfigs()) {
+ String tag = TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(),
+ tableConfig.getTableType());
+ tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication()));
+ String brokerTag = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
+ tagMinInstanceMap.put(brokerTag, 1);
+ }
+ return tagMinInstanceMap;
+ }
+
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 9b84c134eb..3bf0e2c70d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -19,24 +19,34 @@
package org.apache.pinot.controller.api;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.pinot.controller.api.resources.InstanceTagUpdateRequest;
+import org.apache.pinot.controller.api.resources.OperationValidationResponse;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
@@ -47,22 +57,24 @@ import static org.testng.Assert.assertTrue;
*/
public class PinotInstanceRestletResourceTest extends ControllerTest {
+ private ControllerRequestURLBuilder _urlBuilder = null;
+
@BeforeClass
public void setUp()
throws Exception {
DEFAULT_INSTANCE.setupSharedStateAndValidate();
+ _urlBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
}
@Test
public void testInstanceListingAndCreation()
throws Exception {
- ControllerRequestURLBuilder requestURLBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
- String listInstancesUrl = requestURLBuilder.forInstanceList();
+ String listInstancesUrl = _urlBuilder.forInstanceList();
int expectedNumInstances = 1 + DEFAULT_NUM_BROKER_INSTANCES + DEFAULT_NUM_SERVER_INSTANCES;
checkNumInstances(listInstancesUrl, expectedNumInstances);
// Create untagged broker and server instances
- String createInstanceUrl = requestURLBuilder.forInstanceCreate();
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
Instance brokerInstance1 = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null, 0, 0, 0, 0, false);
sendPostRequest(createInstanceUrl, brokerInstance1.toJsonString());
Instance serverInstance1 =
@@ -110,14 +122,14 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
new Instance("1.2.3.4", 1234, InstanceType.BROKER, Collections.singletonList(newBrokerTag), null, 0, 0, 0, 0,
false);
String brokerInstanceId = "Broker_1.2.3.4_1234";
- String brokerInstanceUrl = requestURLBuilder.forInstance(brokerInstanceId);
+ String brokerInstanceUrl = _urlBuilder.forInstance(brokerInstanceId);
sendPutRequest(brokerInstanceUrl, newBrokerInstance.toJsonString());
String newServerTag = "new-server-tag";
Instance newServerInstance =
new Instance("1.2.3.4", 2345, InstanceType.SERVER, Collections.singletonList(newServerTag), null, 28090, 28091,
28092, 28093, true);
String serverInstanceId = "Server_1.2.3.4_2345";
- String serverInstanceUrl = requestURLBuilder.forInstance(serverInstanceId);
+ String serverInstanceUrl = _urlBuilder.forInstance(serverInstanceId);
sendPutRequest(serverInstanceUrl, newServerInstance.toJsonString());
checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{newBrokerTag}, null, -1, -1, -1, -1, false);
@@ -126,9 +138,9 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
// Test Instance updateTags API
String brokerInstanceUpdateTagsUrl =
- requestURLBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER"));
+ _urlBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER"));
sendPutRequest(brokerInstanceUpdateTagsUrl);
- String serverInstanceUpdateTagsUrl = requestURLBuilder.forInstanceUpdateTags(serverInstanceId,
+ String serverInstanceUpdateTagsUrl = _urlBuilder.forInstanceUpdateTags(serverInstanceId,
Lists.newArrayList("tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME"));
sendPutRequest(serverInstanceUpdateTagsUrl);
checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{"tag_BROKER", "newTag_BROKER"}, null, -1, -1, -1,
@@ -137,10 +149,10 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
new String[]{"tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME"}, null, 28090, 28091, 28092, 28093, true);
// Test DELETE instance API
- sendDeleteRequest(requestURLBuilder.forInstance("Broker_1.2.3.4_1234"));
- sendDeleteRequest(requestURLBuilder.forInstance("Server_1.2.3.4_2345"));
- sendDeleteRequest(requestURLBuilder.forInstance("Broker_2.3.4.5_1234"));
- sendDeleteRequest(requestURLBuilder.forInstance("Server_2.3.4.5_2345"));
+ sendDeleteRequest(_urlBuilder.forInstance("Broker_1.2.3.4_1234"));
+ sendDeleteRequest(_urlBuilder.forInstance("Server_1.2.3.4_2345"));
+ sendDeleteRequest(_urlBuilder.forInstance("Broker_2.3.4.5_1234"));
+ sendDeleteRequest(_urlBuilder.forInstance("Server_2.3.4.5_2345"));
checkNumInstances(listInstancesUrl, expectedNumInstances);
}
@@ -163,7 +175,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
boolean queriesDisabled)
throws Exception {
JsonNode response = JsonUtils.stringToJsonNode(
- ControllerTest.sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forInstance(instanceName)));
+ ControllerTest.sendGetRequest(_urlBuilder.forInstance(instanceName)));
assertEquals(response.get("instanceName").asText(), instanceName);
assertEquals(response.get("hostName").asText(), hostName);
assertTrue(response.get("enabled").asBoolean());
@@ -193,6 +205,95 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
}
}
+ @Test
+ public void instanceRetagHappyPathTest()
+ throws IOException {
+ Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap();
+ List<InstanceTagUpdateRequest> request = new ArrayList<>();
+ currentInstanceTagsMap.forEach((instance, tags) -> {
+ if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)
+ || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+ InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest();
+ payload.setInstanceName(instance);
+ payload.setNewTags(tags);
+ request.add(payload);
+ }
+ });
+ List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue(
+ sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)),
+ OperationValidationResponse[].class));
+ assertNotNull(response);
+ response.forEach(item -> assertTrue(item.isSafe()));
+ }
+
+ @Test
+ public void instanceRetagServerDeficiencyTest()
+ throws Exception {
+ String tableName = "testTable";
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
+ .setNumReplicas(2).build();
+ // create table with replication as 2 so that DefaultTenant has a minimum server requirement as 2.
+ DEFAULT_INSTANCE.addTableConfig(tableConfig);
+ Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap();
+ List<InstanceTagUpdateRequest> request = new ArrayList<>();
+ currentInstanceTagsMap.forEach((instance, tags) -> {
+ if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)
+ || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+ InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest();
+ payload.setInstanceName(instance);
+ payload.setNewTags(Lists.newArrayList());
+ request.add(payload);
+ }
+ });
+ List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue(
+ sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)),
+ OperationValidationResponse[].class));
+ assertNotNull(response);
+
+ int deficientServers = 2;
+ int deficientBrokers = 1;
+ for (OperationValidationResponse item : response) {
+ String instanceName = item.getInstanceName();
+ boolean validity = item.isSafe();
+ if (!validity) {
+ List<OperationValidationResponse.ErrorWrapper> issues = item.getIssues();
+ assertEquals(issues.size(), 1);
+ assertEquals(issues.get(0).getCode(), OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED);
+ if (instanceName.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ deficientServers--;
+ } else if (instanceName.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
+ deficientBrokers--;
+ }
+ }
+ }
+ assertEquals(deficientServers, 0);
+ assertEquals(deficientBrokers, 0);
+ DEFAULT_INSTANCE.dropOfflineTable(tableName);
+ }
+
+ private Map<String, List<String>> getCurrentInstanceTagsMap()
+ throws IOException {
+ String listInstancesUrl = _urlBuilder.forInstanceList();
+ JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl));
+ JsonNode instances = response.get("instances");
+ Map<String, List<String>> map = new HashMap<>(instances.size());
+ for (int i = 0; i < instances.size(); i++) {
+ String instance = instances.get(i).asText();
+ map.put(instance, getInstanceTags(instance));
+ }
+ return map;
+ }
+
+ private List<String> getInstanceTags(String instance)
+ throws IOException {
+ String getInstancesUrl = _urlBuilder.forInstance(instance);
+ List<String> tags = new ArrayList<>();
+ for (JsonNode tag : JsonUtils.stringToJsonNode(sendGetRequest(getInstancesUrl)).get("tags")) {
+ tags.add(tag.asText());
+ }
+ return tags;
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 394d11e47e..fe04bc3e19 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -542,6 +542,10 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseStatus");
}
+ public String forUpdateTagsValidation() {
+ return String.format("%s/instances/updateTags/validate", _baseUrl);
+ }
+
private static String encode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org