You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/05/30 20:38:40 UTC

[helix] branch master updated: Waged - Hard constraint - Fix for n - n+1 issue. (#2493)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cf39e19e Waged - Hard constraint - Fix for n - n+1 issue. (#2493)
6cf39e19e is described below

commit 6cf39e19e162707766bedbfb6ab566930d3471d8
Author: Komal Desai <98...@users.noreply.github.com>
AuthorDate: Tue May 30 13:38:34 2023 -0700

    Waged - Hard constraint - Fix for n - n+1 issue. (#2493)
    
    * Waged - Hard constraint - Fix for n - n+1 issue.
    
    Waged - Constraint based algorithm has few corner cases where the hard-constraint is not satisfied.
    Details are documented at Wiki: https://github.com/apache/helix/wiki/WAGED-rebalancer-Hard-Constraint-Scope-Expansion
    
    Google doc: https://docs.google.com/document/d/1uxCqSU0xriUTdZIyxVoEvPMvywRgHCANatUlR0YQojk/edit#
    
    Original proposal was to throttle the ST mesages in intermediate state calculation stage.
    The alternate proposal is to it in BestPossibleCalcStage() but after the actual calculations are done.
    This is done later as we don't want to cause ping-pong effect as part of the Baseline or
    Partial Pipeline calculations.
    
    This change introduces a data provider, InstanceCapacity which will take into consideration pending messages on the instances.
    
    Part-2 will introduce the change where this will be used in DelayedAutoRebalancer.
    
    
    ---------
    
    Co-authored-by: Komal Desai <kd...@kdesai-mn1.linkedin.biz>
---
 .../InstanceCapacityDataProvider.java              |  59 ++++++++
 .../rebalancer/waged/WagedInstanceCapacity.java    | 160 +++++++++++++++++++++
 .../controller/stages/CurrentStateOutput.java      |   5 +
 .../rebalancer/waged/TestWagedRebalancer.java      |  36 +++++
 4 files changed, 260 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
new file mode 100644
index 000000000..ce550de33
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.dataproviders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+/**
+ * An interface to provide capacity data for instance.
+ * It will consider the pending state transition and assigned partitions capacity.
+ * It will be dynamic and will always provide the available "headroom".
+ *
+ * The actual implementation will be stateful.
+ */
+public interface InstanceCapacityDataProvider {
+
+ /**
+   * Get the instance remaining capacity. 
+   * Capacity and weight both are represented as Key-Value.
+   * Returns the capacity map of available head room for the instance.
+   * @param instanceName - instance name to query
+   * @return Map<String, Integer> - capacity pair for all defined attributes for the instance.
+   */
+  public Map<String, Integer> getInstanceAvailableCapacity(String instanceName);
+
+  /**
+   * Check if partition can be placed on the instance.
+   *
+   * @param instanceName - instance name 
+   * @param partitionCapacity - Partition capacity expresed in capacity map.
+   * @return boolean - True if the partition can be placed, False otherwise
+   */
+  public boolean isInstanceCapacityAvailable(String instanceName, Map<String, Integer> partitionCapacity);
+
+  /**
+   * Reduce the available capacity by specified Partition Capacity Map.
+   *
+   * @param instanceName - instance name 
+   * @param partitionCapacity - Partition capacity expresed in capacity map.
+   * @returns boolean - True if successfully updated partition capacity, false otherwise.
+   */
+  public boolean reduceAvailableInstanceCapacity(String instanceName, Map<String, Integer> partitionCapacity);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
new file mode 100644
index 000000000..02c869a5a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -0,0 +1,160 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.controller.dataproviders.InstanceCapacityDataProvider;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WagedInstanceCapacity implements InstanceCapacityDataProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(WagedInstanceCapacity.class);
+
+  // Available Capacity per Instance
+  private final Map<String, Map<String, Integer>> _instanceCapacityMap;
+  private final ResourceControllerDataProvider _cache;
+
+  public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
+    _cache = clusterData;
+    _instanceCapacityMap = new HashMap<>();
+
+    ClusterConfig clusterConfig = _cache.getClusterConfig();
+    for (InstanceConfig instanceConfig : _cache.getInstanceConfigMap().values()) {
+      Map<String, Integer> instanceCapacity =
+        WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
+      _instanceCapacityMap.put(instanceConfig.getInstanceName(), instanceCapacity);
+    }
+  }
+
+  /**
+   * Create Default Capacity Map.
+   * This is a utility method to create a default capacity map matching instance capacity map for participants.
+   * This is required as non-WAGED partitions will be placed on same instance and we don't know their actual capacity.
+   * This will generate default values of 0 for all the capacity keys.
+   */
+  private Map<String, Integer> createDefaultParticipantWeight() {
+    // copy the value of first Instance capacity.
+    Map<String, Integer> partCapacity = new HashMap<>(_instanceCapacityMap.values().iterator().next());
+
+    // Set the value of all capacity to -1.
+    for (String key : partCapacity.keySet()) {
+      partCapacity.put(key, -1);
+    }
+    return partCapacity;
+  }
+
+  /**
+   * Process the pending messages based on the Current states
+   * @param currentState - Current state of the resources.
+   */
+  public void processPendingMessages(CurrentStateOutput currentState) {
+    Map<String, Map<Partition, Map<String, Message>>> pendingMsgs = currentState.getPendingMessages();
+
+    for (String resource : pendingMsgs.keySet()) {
+      Map<Partition, Map<String, Message>> partitionMsgs = pendingMsgs.get(resource);
+
+      for (Partition partition : partitionMsgs.keySet()) {
+        String partitionName = partition.getPartitionName();
+
+        // Get Partition Weight
+        Map<String, Integer> partCapacity = getPartitionCapacity(resource, partitionName);
+
+        // TODO - check
+        Map<String, Message> msgs = partitionMsgs.get(partition);
+        // TODO - Check
+        for (String instance : msgs.keySet()) {
+           reduceAvailableInstanceCapacity(instance, partCapacity);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the partition capacity given Resource and Partition name.
+   */
+  private Map<String, Integer> getPartitionCapacity(String resource, String partition) {
+    ClusterConfig clusterConfig = _cache.getClusterConfig();
+    ResourceConfig resourceConfig = _cache.getResourceConfig(resource);
+
+
+    // Parse the entire capacityMap from ResourceConfig
+    Map<String, Map<String, Integer>> capacityMap;
+    try {
+      capacityMap = resourceConfig.getPartitionCapacityMap();
+    } catch (IOException ex) {
+      return createDefaultParticipantWeight();
+    }
+    return WagedValidationUtil.validateAndGetPartitionCapacity(partition, resourceConfig, capacityMap, clusterConfig);
+  }
+
+  /**
+   * Get the instance remaining capacity.
+   * Capacity and weight both are represented as Key-Value.
+   * Returns the capacity map of available head room for the instance.
+   * @param instanceName - instance name to query
+   * @return Map<String, Integer> - capacity pair for all defined attributes for the instance.
+   */
+  @Override
+  public Map<String, Integer> getInstanceAvailableCapacity(String instanceName) {
+    return _instanceCapacityMap.get(instanceName);
+  }
+
+  @Override
+  public boolean isInstanceCapacityAvailable(String instance, Map<String, Integer> partitionCapacity) {
+    Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
+    for (String key : instanceCapacity.keySet()) {
+      int partCapacity = partitionCapacity.getOrDefault(key, 0);
+      if (partCapacity != 0 && instanceCapacity.get(key) < partCapacity) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean reduceAvailableInstanceCapacity(String instance, Map<String, Integer> partitionCapacity) {
+    Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
+    for (String key : instanceCapacity.keySet()) {
+      if (partitionCapacity.containsKey(key)) {
+        int partCapacity = partitionCapacity.getOrDefault(key, 0);
+        if (partCapacity != 0 && instanceCapacity.get(key) < partCapacity) {
+          return false;
+        }
+        if (partCapacity != 0) {
+          instanceCapacity.put(key, instanceCapacity.get(key) - partCapacity);
+        }
+      }
+    }
+    return true;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 0ef47c8e7..0a85dc7b7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -460,4 +460,9 @@ public class CurrentStateOutput {
 
     return currentStateAssignment;
   }
+
+
+  public Map<String, Map<Partition, Map<String, Message>>> getPendingMessages() {
+    return Collections.unmodifiableMap(_pendingMessageMap);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 60cc4bc5f..2836af745 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -865,4 +865,40 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Map<String, Integer> weights2 = Map.of("item1", 5, "item2", 10, "item3", 0);
     Assert.assertEquals(dataProvider.getPartitionWeights("Resource2", "Partition2"), weights2);
   }
+
+  @Test
+  public void testInstanceCapacityProvider() throws IOException, HelixRebalanceException {
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
+
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+
+    // force create a fake offlineInstance that's in delay window
+    Set<String> instances = new HashSet<>(_instances);
+    when(clusterData.getAllInstances()).thenReturn(instances);
+    when(clusterData.getEnabledInstances()).thenReturn(instances);
+    when(clusterData.getEnabledLiveInstances()).thenReturn(instances);
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(2);
+      isMap.put(resource, idealState);
+    }
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    WagedInstanceCapacity provider = new WagedInstanceCapacity(clusterData);
+   
+    Map<String, Integer> weights1 = Map.of("item1", 20, "item2", 40, "item3", 30);
+    Map<String, Integer> capacity = provider.getInstanceAvailableCapacity("testInstanceId");
+    Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId"), weights1);
+    Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId1"), weights1);
+    Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId2"), weights1);
+  }
 }