You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/02/03 19:43:58 UTC
[helix] branch throttle updated: Per Replica Throttle -- 1st:
Skeleton implementation with output message same as input (#1620)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch throttle
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/throttle by this push:
new a25e9af Per Replica Throttle -- 1st: Skeleton implementation with output message same as input (#1620)
a25e9af is described below
commit a25e9af99e1121ae164bd1e9ffdafbbfc18c09b1
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 3 11:43:50 2021 -0800
Per Replica Throttle -- 1st: Skeleton implementation with output message same as input (#1620)
Per replica throttling replacing intermediate stage which is partition based. The finer granularity would skip boosting unnecessary replica in a recovery partition.
The first part of Per Replica throttling, the skeleton only, outputs the input messages.
Co-authored-by: Kai Sun <ks...@ksun-mn1.linkedin.biz>
---
.../helix/controller/stages/AttributeName.java | 8 +-
.../controller/stages/PerReplicaThrottleStage.java | 273 +++++++++++++++++++++
.../helix/monitoring/mbeans/ResourceMonitor.java | 3 +-
.../stages/TestPerReplicaThrottleStage.java | 119 +++++++++
4 files changed, 401 insertions(+), 2 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index d9ca40b..ec4f36d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -50,5 +50,11 @@ public enum AttributeName {
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
JOBS_WITHOUT_CONFIG,
// This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
- TO_BE_PURGED_JOBS_MAP
+ TO_BE_PURGED_JOBS_MAP,
+
+ // This attribute denotes the messages output from Per Preplica Throttle stage
+ PER_REPLICA_THROTTLE_OUTPUT_MESSAGES,
+
+ // This attribute denotes the targeted partition state mapping from Per Preplica Throttle stage
+ PER_REPLICA_THROTTLE_RETRACED_STATES
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
new file mode 100644
index 0000000..9297a4d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
@@ -0,0 +1,273 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
+ private static final Logger logger =
+ LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ _eventId = event.getEventId();
+
+ CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+
+ MessageOutput selectedMessages = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ LogUtil.logDebug(logger, _eventId, String.format("selectedMessages is: %s", selectedMessages));
+
+ Map<String, Resource> resourceToRebalance =
+ event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+ ResourceControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+ if (currentStateOutput == null || selectedMessages == null || resourceToRebalance == null
+ || cache == null) {
+ throw new StageException(String.format("Missing attributes in event: %s. "
+ + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+ event, currentStateOutput, selectedMessages, resourceToRebalance, cache));
+ }
+
+ ResourcesStateMap retracedResourceStateMap = new ResourcesStateMap();
+ MessageOutput output =
+ compute(event, resourceToRebalance, selectedMessages, retracedResourceStateMap);
+
+ event.addAttribute(AttributeName.PER_REPLICA_THROTTLE_OUTPUT_MESSAGES.name(), output);
+ LogUtil.logDebug(logger, _eventId,
+ String.format("retraceResourceStateMap is: %s", retracedResourceStateMap));
+ event.addAttribute(AttributeName.PER_REPLICA_THROTTLE_RETRACED_STATES.name(),
+ retracedResourceStateMap);
+
+ //TODO: enter maintenance mode logic in next PR
+ }
+
+ private List<ResourcePriority> getResourcePriorityList(Map<String, Resource> resourceMap,
+ ResourceControllerDataProvider dataCache) {
+ List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
+ resourceMap.keySet().stream().forEach(
+ resourceName -> prioritizedResourceList.add(new ResourcePriority(resourceName, dataCache)));
+ Collections.sort(prioritizedResourceList);
+
+ return prioritizedResourceList;
+ }
+
+ /**
+ * Go through each resource, and based on messageSelected and currentState, compute
+ * messageOutput while maintaining throttling constraints (for example, ensure that the number
+ * of possible pending state transitions does NOT go over the set threshold).
+ * @param event
+ * @param resourceMap
+ * @param selectedMessage
+ * @param retracedResourceStateMap out
+ */
+ private MessageOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
+ MessageOutput selectedMessage, ResourcesStateMap retracedResourceStateMap) {
+ MessageOutput output = new MessageOutput();
+
+ ResourceControllerDataProvider dataCache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+ StateTransitionThrottleController throttleController =
+ new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
+ dataCache.getLiveInstances().keySet());
+
+ List<ResourcePriority> prioritizedResourceList =
+ getResourcePriorityList(resourceMap, dataCache);
+
+ List<String> failedResources = new ArrayList<>();
+
+ // Priority is applied in assignment computation because higher priority by looping in order of
+ // decreasing priority
+ for (ResourcePriority resourcePriority : prioritizedResourceList) {
+ String resourceName = resourcePriority.getResourceName();
+
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ if (!bestPossibleStateOutput.containsResource(resourceName)) {
+ LogUtil.logInfo(logger, _eventId, String.format(
+ "Skip calculating per replica state for resource %s because the best possible state is not available.",
+ resourceName));
+ continue;
+ }
+
+ Resource resource = resourceMap.get(resourceName);
+ IdealState idealState = dataCache.getIdealState(resourceName);
+ if (idealState == null) {
+ // If IdealState is null, use an empty one
+ LogUtil.logInfo(logger, _eventId, String
+ .format("IdealState for resource %s does not exist; resource may not exist anymore",
+ resourceName));
+ idealState = new IdealState(resourceName);
+ idealState.setStateModelDefRef(resource.getStateModelDefRef());
+ }
+
+ Map<Partition, Map<String, String>> retracedPartitionsState = new HashMap<>();
+ try {
+ Map<Partition, List<Message>> partitonMsgMap = new HashMap<>();
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> msgList = selectedMessage.getMessages(resourceName, partition);
+ partitonMsgMap.put(partition, msgList);
+ }
+ MessageOutput resourceMsgOut =
+ throttlePerReplicaMessages(idealState, partitonMsgMap, bestPossibleStateOutput,
+ throttleController, retracedPartitionsState);
+ for (Partition partition : resource.getPartitions()) {
+ List<Message> msgList = resourceMsgOut.getMessages(resourceName, partition);
+ output.addMessages(resourceName, partition, msgList);
+ }
+ retracedResourceStateMap.setState(resourceName, retracedPartitionsState);
+ } catch (HelixException ex) {
+ LogUtil.logInfo(logger, _eventId,
+ "Failed to calculate per replica partition states for resource " + resourceName, ex);
+ failedResources.add(resourceName);
+ }
+ }
+
+ // TODO: add monitoring in next PR.
+ return output;
+ }
+
+ /*
+ * Apply per-replica throttling logic and filter out excessive recovery and load messages for a
+ * given resource.
+ * Reconstruct retrace partition states for a resource based on pending and targeted messages
+ * Return messages for partitions of a resource.
+ * @param idealState
+ * @param selectedResourceMessages
+ * @param bestPossibleStateOutput
+ * @param throttleController
+ * @param retracedPartitionsStateMap
+ */
+ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
+ Map<Partition, List<Message>> selectedResourceMessages,
+ BestPossibleStateOutput bestPossibleStateOutput,
+ StateTransitionThrottleController throttleController,
+ Map<Partition, Map<String, String>> retracedPartitionsStateMap) {
+ MessageOutput output = new MessageOutput();
+ String resourceName = idealState.getResourceName();
+ LogUtil.logInfo(logger, _eventId, String.format("Processing resource: %s", resourceName));
+
+ // TODO: expand per-replica-throttling beyond FULL_AUTO
+ if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO
+ .equals(idealState.getRebalanceMode())) {
+ retracedPartitionsStateMap
+ .putAll(bestPossibleStateOutput.getPartitionStateMap(resourceName).getStateMap());
+ for (Partition partition : selectedResourceMessages.keySet()) {
+ output.addMessages(resourceName, partition, selectedResourceMessages.get(partition));
+ }
+ return output;
+ }
+
+ // TODO: later PRs
+ // Step 1: charge existing pending messages and update retraced state map.
+ // Step 2: classify all the messages into recovery message list and load message list
+ // Step 3: sorts recovery message list and applies throttling
+ // Step 4: sorts load message list and applies throttling
+ // Step 5: construct output
+ for (Partition partition : selectedResourceMessages.keySet()) {
+ List<Message> partitionMessages = selectedResourceMessages.get(partition);
+ if (partitionMessages == null) {
+ continue;
+ }
+ List<Message> finalPartitionMessages = new ArrayList<>();
+ for (Message message : partitionMessages) {
+ // TODO: next PR messages exclusion
+ finalPartitionMessages.add(message);
+ }
+ output.addMessages(resourceName, partition, finalPartitionMessages);
+ }
+ // Step 6: constructs all retraced partition state map for the resource
+ // TODO: next PR
+ // Step 7: emit metrics
+ // TODO: next PR
+ return output;
+ }
+
+ // ------------------ utilities ---------------------------
+
+ /**
+ * POJO that maps resource name to its priority represented by an integer.
+ */
+ private static class ResourcePriority implements Comparable<ResourcePriority> {
+ private String _resourceName;
+ private int _priority;
+
+ ResourcePriority(String resourceName, ResourceControllerDataProvider dataCache) {
+ // Resource level prioritization based on the numerical (sortable) priority field.
+ // If the resource priority field is null/not set, the resource will be treated as lowest
+ // priority.
+ _priority = Integer.MIN_VALUE;
+ _resourceName = resourceName;
+ String priorityField = dataCache.getClusterConfig().getResourcePriorityField();
+ if (priorityField != null) {
+ // Will take the priority from ResourceConfig first
+ // If ResourceConfig does not exist or does not have this field.
+ // Try to load it from the resource's IdealState. Otherwise, keep it at the lowest priority
+ ResourceConfig config = dataCache.getResourceConfig(resourceName);
+ IdealState idealState = dataCache.getIdealState(resourceName);
+ if (config != null && config.getSimpleConfig(priorityField) != null) {
+ this.setPriority(config.getSimpleConfig(priorityField));
+ } else if (idealState != null
+ && idealState.getRecord().getSimpleField(priorityField) != null) {
+ this.setPriority(idealState.getRecord().getSimpleField(priorityField));
+ }
+ }
+ }
+
+ @Override
+ public int compareTo(ResourcePriority resourcePriority) {
+ // make sure larger _priority is in front of small _priority at sort time
+ return Integer.compare(resourcePriority._priority, _priority);
+ }
+
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ public void setPriority(String priority) {
+ try {
+ _priority = Integer.parseInt(priority);
+ } catch (Exception e) {
+ logger.warn(
+ String.format("Invalid priority field %s for resource %s", priority, _resourceName));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index b659f9c..903cf2d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -47,7 +47,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
UNKNOWN,
NORMAL,
BEST_POSSIBLE_STATE_CAL_FAILED,
- INTERMEDIATE_STATE_CAL_FAILED
+ INTERMEDIATE_STATE_CAL_FAILED,
+ PER_REPLICA_THROTTLE_CAL_FAILED
}
private static final String GAUGE_METRIC_SUFFIX = "Gauge";
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java
new file mode 100644
index 0000000..37ceb02
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java
@@ -0,0 +1,119 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestPerReplicaThrottleStage extends BaseStageTest {
+
+ private void preSetup(String[] resources, int nPartition, int nReplica) {
+ setupIdealState(nReplica, resources, nPartition, nReplica, IdealState.RebalanceMode.FULL_AUTO,
+ "MasterSlave", null, null, 2);
+ setupStateModel();
+ setupLiveInstances(nReplica);
+ }
+
+ // null case, make sure the messages would pass without any throttle
+ @Test
+ public void testNoThrottleMessagePass() {
+ String resourcePrefix = "resource";
+ int nResource = 1;
+ int nPartition = 1;
+ int nReplica = 3;
+ String[] resources = new String[nResource];
+ for (int i = 0; i < nResource; i++) {
+ resources[i] = resourcePrefix + "-" + i;
+ }
+
+ preSetup(resources, nPartition, nReplica);
+
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ getResourceMap(resources, nPartition, "MasterSlave"));
+
+ // setup current state; setup message output; setup best possible
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ MessageOutput messageOutput = new MessageOutput();
+ BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+ for (String resource : resources) {
+ for (int p = 0; p < nPartition; p++) {
+ Partition partition = new Partition(resource + "_" + p);
+ currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 0, "OFFLINE");
+ currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 1, "SLAVE");
+ currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 2, "OFFLINE");
+ Message msg = new Message(Message.MessageType.STATE_TRANSITION, "001");
+ msg.setToState("SLAVE");
+ msg.setFromState("OFFLINE");
+ msg.setTgtName(HOSTNAME_PREFIX + 2);
+ messageOutput.addMessage(resource, partition, msg);
+ bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 0, "MASTER");
+ bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 1, "SLAVE");
+ bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 2, "SLAVE");
+ List<String> list =
+ Arrays.asList(HOSTNAME_PREFIX + 0, HOSTNAME_PREFIX + 1, HOSTNAME_PREFIX + 2);
+ bestPossibleStateOutput.setPreferenceList(resource, partition.getPartitionName(), list);
+ }
+ }
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
+ new ResourceControllerDataProvider());
+
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, new PerReplicaThrottleStage());
+
+ MessageOutput output =
+ event.getAttribute(AttributeName.PER_REPLICA_THROTTLE_OUTPUT_MESSAGES.name());
+ Partition partition = new Partition(resources[0] + "_0");
+ List<Message> msgs = output.getMessages(resources[0], partition);
+ Assert.assertTrue(msgs.size() == 1);
+ Message msg = msgs.get(0);
+ Assert.assertTrue(msg.getId().equals("001"));
+ }
+
+ protected Map<String, Resource> getResourceMap(String[] resources, int partitions,
+ String stateModel) {
+ Map<String, Resource> resourceMap = new HashMap<String, Resource>();
+
+ for (String r : resources) {
+ Resource testResource = new Resource(r);
+ testResource.setStateModelDefRef(stateModel);
+ for (int i = 0; i < partitions; i++) {
+ testResource.addPartition(r + "_" + i);
+ }
+ resourceMap.put(r, testResource);
+ }
+
+ return resourceMap;
+ }
+}
+