You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/02/03 19:43:58 UTC

[helix] branch throttle updated: Per Replica Throttle -- 1st: Skeleton implementation with output message same as input (#1620)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch throttle
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/throttle by this push:
     new a25e9af  Per Replica Throttle -- 1st: Skeleton implementation with output message same as input (#1620)
a25e9af is described below

commit a25e9af99e1121ae164bd1e9ffdafbbfc18c09b1
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 3 11:43:50 2021 -0800

    Per Replica Throttle -- 1st: Skeleton implementation with output message same as input (#1620)
    
    Per replica throttling replacing intermediate stage which is partition based. The finer granularity would skip boosting unnecessary replica in a recovery partition.
    
    The first part of Per Replica throttling, the skeleton only, outputs the input messages.
    
    Co-authored-by: Kai Sun <ks...@ksun-mn1.linkedin.biz>
---
 .../helix/controller/stages/AttributeName.java     |   8 +-
 .../controller/stages/PerReplicaThrottleStage.java | 273 +++++++++++++++++++++
 .../helix/monitoring/mbeans/ResourceMonitor.java   |   3 +-
 .../stages/TestPerReplicaThrottleStage.java        | 119 +++++++++
 4 files changed, 401 insertions(+), 2 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index d9ca40b..ec4f36d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -50,5 +50,11 @@ public enum AttributeName {
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
   JOBS_WITHOUT_CONFIG,
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
-  TO_BE_PURGED_JOBS_MAP
+  TO_BE_PURGED_JOBS_MAP,
+
+  // This attribute denotes the messages output from Per Preplica Throttle stage
+  PER_REPLICA_THROTTLE_OUTPUT_MESSAGES,
+
+  // This attribute denotes the targeted partition state mapping from Per Preplica Throttle stage
+  PER_REPLICA_THROTTLE_RETRACED_STATES
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
new file mode 100644
index 0000000..9297a4d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
@@ -0,0 +1,273 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PerReplicaThrottleStage extends AbstractBaseStage {
+  private static final Logger logger =
+      LoggerFactory.getLogger(PerReplicaThrottleStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+
+    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+
+    MessageOutput selectedMessages = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    LogUtil.logDebug(logger, _eventId, String.format("selectedMessages is: %s", selectedMessages));
+
+    Map<String, Resource> resourceToRebalance =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    ResourceControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+    if (currentStateOutput == null || selectedMessages == null || resourceToRebalance == null
+        || cache == null) {
+      throw new StageException(String.format("Missing attributes in event: %s. "
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, selectedMessages, resourceToRebalance, cache));
+    }
+
+    ResourcesStateMap retracedResourceStateMap = new ResourcesStateMap();
+    MessageOutput output =
+        compute(event, resourceToRebalance, selectedMessages, retracedResourceStateMap);
+
+    event.addAttribute(AttributeName.PER_REPLICA_THROTTLE_OUTPUT_MESSAGES.name(), output);
+    LogUtil.logDebug(logger, _eventId,
+        String.format("retraceResourceStateMap is: %s", retracedResourceStateMap));
+    event.addAttribute(AttributeName.PER_REPLICA_THROTTLE_RETRACED_STATES.name(),
+        retracedResourceStateMap);
+
+    //TODO: enter maintenance mode logic in next PR
+  }
+
+  private List<ResourcePriority> getResourcePriorityList(Map<String, Resource> resourceMap,
+      ResourceControllerDataProvider dataCache) {
+    List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
+    resourceMap.keySet().stream().forEach(
+        resourceName -> prioritizedResourceList.add(new ResourcePriority(resourceName, dataCache)));
+    Collections.sort(prioritizedResourceList);
+
+    return prioritizedResourceList;
+  }
+
+  /**
+   * Go through each resource, and based on messageSelected and currentState, compute
+   * messageOutput while maintaining throttling constraints (for example, ensure that the number
+   * of possible pending state transitions does NOT go over the set threshold).
+   * @param event
+   * @param resourceMap
+   * @param selectedMessage
+   * @param retracedResourceStateMap out
+   */
+  private MessageOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
+      MessageOutput selectedMessage, ResourcesStateMap retracedResourceStateMap) {
+    MessageOutput output = new MessageOutput();
+
+    ResourceControllerDataProvider dataCache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+    StateTransitionThrottleController throttleController =
+        new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
+            dataCache.getLiveInstances().keySet());
+
+    List<ResourcePriority> prioritizedResourceList =
+        getResourcePriorityList(resourceMap, dataCache);
+
+    List<String> failedResources = new ArrayList<>();
+
+    // Priority is applied in assignment computation because higher priority by looping in order of
+    // decreasing priority
+    for (ResourcePriority resourcePriority : prioritizedResourceList) {
+      String resourceName = resourcePriority.getResourceName();
+
+      BestPossibleStateOutput bestPossibleStateOutput =
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+      if (!bestPossibleStateOutput.containsResource(resourceName)) {
+        LogUtil.logInfo(logger, _eventId, String.format(
+            "Skip calculating per replica state for resource %s because the best possible state is not available.",
+            resourceName));
+        continue;
+      }
+
+      Resource resource = resourceMap.get(resourceName);
+      IdealState idealState = dataCache.getIdealState(resourceName);
+      if (idealState == null) {
+        // If IdealState is null, use an empty one
+        LogUtil.logInfo(logger, _eventId, String
+            .format("IdealState for resource %s does not exist; resource may not exist anymore",
+                resourceName));
+        idealState = new IdealState(resourceName);
+        idealState.setStateModelDefRef(resource.getStateModelDefRef());
+      }
+
+      Map<Partition, Map<String, String>> retracedPartitionsState = new HashMap<>();
+      try {
+        Map<Partition, List<Message>> partitonMsgMap = new HashMap<>();
+        for (Partition partition : resource.getPartitions()) {
+          List<Message> msgList = selectedMessage.getMessages(resourceName, partition);
+          partitonMsgMap.put(partition, msgList);
+        }
+        MessageOutput resourceMsgOut =
+            throttlePerReplicaMessages(idealState, partitonMsgMap, bestPossibleStateOutput,
+                throttleController, retracedPartitionsState);
+        for (Partition partition : resource.getPartitions()) {
+          List<Message> msgList = resourceMsgOut.getMessages(resourceName, partition);
+          output.addMessages(resourceName, partition, msgList);
+        }
+        retracedResourceStateMap.setState(resourceName, retracedPartitionsState);
+      } catch (HelixException ex) {
+        LogUtil.logInfo(logger, _eventId,
+            "Failed to calculate per replica partition states for resource " + resourceName, ex);
+        failedResources.add(resourceName);
+      }
+    }
+
+    // TODO: add monitoring in next PR.
+    return output;
+  }
+
+  /*
+   * Apply per-replica throttling logic and filter out excessive recovery and load messages for a
+   * given resource.
+   * Reconstruct retrace partition states for a resource based on pending and targeted messages
+   * Return messages for partitions of a resource.
+   * @param idealState
+   * @param selectedResourceMessages
+   * @param bestPossibleStateOutput
+   * @param throttleController
+   * @param retracedPartitionsStateMap
+   */
+  private MessageOutput throttlePerReplicaMessages(IdealState idealState,
+      Map<Partition, List<Message>> selectedResourceMessages,
+      BestPossibleStateOutput bestPossibleStateOutput,
+      StateTransitionThrottleController throttleController,
+      Map<Partition, Map<String, String>> retracedPartitionsStateMap) {
+    MessageOutput output = new MessageOutput();
+    String resourceName = idealState.getResourceName();
+    LogUtil.logInfo(logger, _eventId, String.format("Processing resource: %s", resourceName));
+
+    // TODO: expand per-replica-throttling beyond FULL_AUTO
+    if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO
+        .equals(idealState.getRebalanceMode())) {
+      retracedPartitionsStateMap
+          .putAll(bestPossibleStateOutput.getPartitionStateMap(resourceName).getStateMap());
+      for (Partition partition : selectedResourceMessages.keySet()) {
+        output.addMessages(resourceName, partition, selectedResourceMessages.get(partition));
+      }
+      return output;
+    }
+
+    // TODO: later PRs
+    // Step 1: charge existing pending messages and update retraced state map.
+    // Step 2: classify all the messages into recovery message list and load message list
+    // Step 3: sorts recovery message list and applies throttling
+    // Step 4: sorts load message list and applies throttling
+    // Step 5: construct output
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      List<Message> partitionMessages = selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+      List<Message> finalPartitionMessages = new ArrayList<>();
+      for (Message message : partitionMessages) {
+        // TODO: next PR messages exclusion
+        finalPartitionMessages.add(message);
+      }
+      output.addMessages(resourceName, partition, finalPartitionMessages);
+    }
+    // Step 6: constructs all retraced partition state map for the resource
+    // TODO: next PR
+    // Step 7: emit metrics
+    // TODO: next PR
+    return output;
+  }
+
+  // ------------------ utilities ---------------------------
+
+  /**
+   * POJO that maps resource name to its priority represented by an integer.
+   */
+  private static class ResourcePriority implements Comparable<ResourcePriority> {
+    private String _resourceName;
+    private int _priority;
+
+    ResourcePriority(String resourceName, ResourceControllerDataProvider dataCache) {
+      // Resource level prioritization based on the numerical (sortable) priority field.
+      // If the resource priority field is null/not set, the resource will be treated as lowest
+      // priority.
+      _priority = Integer.MIN_VALUE;
+      _resourceName = resourceName;
+      String priorityField = dataCache.getClusterConfig().getResourcePriorityField();
+      if (priorityField != null) {
+        // Will take the priority from ResourceConfig first
+        // If ResourceConfig does not exist or does not have this field.
+        // Try to load it from the resource's IdealState. Otherwise, keep it at the lowest priority
+        ResourceConfig config = dataCache.getResourceConfig(resourceName);
+        IdealState idealState = dataCache.getIdealState(resourceName);
+        if (config != null && config.getSimpleConfig(priorityField) != null) {
+          this.setPriority(config.getSimpleConfig(priorityField));
+        } else if (idealState != null
+            && idealState.getRecord().getSimpleField(priorityField) != null) {
+          this.setPriority(idealState.getRecord().getSimpleField(priorityField));
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(ResourcePriority resourcePriority) {
+      // make sure larger _priority is in front of small _priority at sort time
+      return Integer.compare(resourcePriority._priority, _priority);
+    }
+
+    public String getResourceName() {
+      return _resourceName;
+    }
+
+    public void setPriority(String priority) {
+      try {
+        _priority = Integer.parseInt(priority);
+      } catch (Exception e) {
+        logger.warn(
+            String.format("Invalid priority field %s for resource %s", priority, _resourceName));
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index b659f9c..903cf2d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -47,7 +47,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     UNKNOWN,
     NORMAL,
     BEST_POSSIBLE_STATE_CAL_FAILED,
-    INTERMEDIATE_STATE_CAL_FAILED
+    INTERMEDIATE_STATE_CAL_FAILED,
+    PER_REPLICA_THROTTLE_CAL_FAILED
   }
 
   private static final String GAUGE_METRIC_SUFFIX = "Gauge";
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java
new file mode 100644
index 0000000..37ceb02
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPerReplicaThrottleStage.java
@@ -0,0 +1,119 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestPerReplicaThrottleStage extends BaseStageTest {
+
+  private void preSetup(String[] resources, int nPartition, int nReplica) {
+    setupIdealState(nReplica, resources, nPartition, nReplica, IdealState.RebalanceMode.FULL_AUTO,
+        "MasterSlave", null, null, 2);
+    setupStateModel();
+    setupLiveInstances(nReplica);
+  }
+
+  // null case, make sure the messages would pass without any throttle
+  @Test
+  public void testNoThrottleMessagePass() {
+    String resourcePrefix = "resource";
+    int nResource = 1;
+    int nPartition = 1;
+    int nReplica = 3;
+    String[] resources = new String[nResource];
+    for (int i = 0; i < nResource; i++) {
+      resources[i] = resourcePrefix + "-" + i;
+    }
+
+    preSetup(resources, nPartition, nReplica);
+
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        getResourceMap(resources, nPartition, "MasterSlave"));
+
+    // setup current state; setup message output; setup best possible
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    MessageOutput messageOutput = new MessageOutput();
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    for (String resource : resources) {
+      for (int p = 0; p < nPartition; p++) {
+        Partition partition = new Partition(resource + "_" + p);
+        currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 0, "OFFLINE");
+        currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 1, "SLAVE");
+        currentStateOutput.setCurrentState(resource, partition, HOSTNAME_PREFIX + 2, "OFFLINE");
+        Message msg = new Message(Message.MessageType.STATE_TRANSITION, "001");
+        msg.setToState("SLAVE");
+        msg.setFromState("OFFLINE");
+        msg.setTgtName(HOSTNAME_PREFIX + 2);
+        messageOutput.addMessage(resource, partition, msg);
+        bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 0, "MASTER");
+        bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 1, "SLAVE");
+        bestPossibleStateOutput.setState(resource, partition, HOSTNAME_PREFIX + 2, "SLAVE");
+        List<String> list =
+            Arrays.asList(HOSTNAME_PREFIX + 0, HOSTNAME_PREFIX + 1, HOSTNAME_PREFIX + 2);
+        bestPossibleStateOutput.setPreferenceList(resource, partition.getPartitionName(), list);
+      }
+    }
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider());
+
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new PerReplicaThrottleStage());
+
+    MessageOutput output =
+        event.getAttribute(AttributeName.PER_REPLICA_THROTTLE_OUTPUT_MESSAGES.name());
+    Partition partition = new Partition(resources[0] + "_0");
+    List<Message> msgs = output.getMessages(resources[0], partition);
+    Assert.assertTrue(msgs.size() == 1);
+    Message msg = msgs.get(0);
+    Assert.assertTrue(msg.getId().equals("001"));
+  }
+
+  protected Map<String, Resource> getResourceMap(String[] resources, int partitions,
+      String stateModel) {
+    Map<String, Resource> resourceMap = new HashMap<String, Resource>();
+
+    for (String r : resources) {
+      Resource testResource = new Resource(r);
+      testResource.setStateModelDefRef(stateModel);
+      for (int i = 0; i < partitions; i++) {
+        testResource.addPartition(r + "_" + i);
+      }
+      resourceMap.put(r, testResource);
+    }
+
+    return resourceMap;
+  }
+}
+