You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/26 20:55:40 UTC

[2/3] helix git commit: Add StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

Add StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

Introduce StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

More design details at: https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Helix+Partition+Movement+Throttling


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d51f3537
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d51f3537
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d51f3537

Branch: refs/heads/master
Commit: d51f35377ad23ee881503b71b09236a39dba352c
Parents: de1a27f
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Sep 25 17:12:46 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 17:12:46 2017 -0700

----------------------------------------------------------------------
 .../helix/api/config/RebalanceConfig.java       |   4 +-
 .../config/StateTransitionThrottleConfig.java   | 250 +++++++++++++++++++
 .../org/apache/helix/model/ClusterConfig.java   |  52 ++++
 .../org/apache/helix/model/ResourceConfig.java  |   2 +-
 4 files changed, 305 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
index 31f6d3b..36766fd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
@@ -182,11 +182,11 @@ public class RebalanceConfig {
   }
 
   /**
-   * Generate the simple field map for RebalanceConfig.
+   * Generate the config map for RebalanceConfig.
    *
    * @return
    */
-  public Map<String, String> getSimpleFieldsMap() {
+  public Map<String, String> getConfigsMap() {
     Map<String, String> simpleFieldMap = new HashMap<String, String>();
 
     if (_rebalanceDelay >= 0) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
new file mode 100644
index 0000000..39bd458
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -0,0 +1,250 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public class StateTransitionThrottleConfig {
+  private static final Logger logger =
+      Logger.getLogger(StateTransitionThrottleConfig.class.getName());
+
+  private enum ConfigProperty {
+    CONFIG_TYPE,
+    REBALANCE_TYPE,
+    THROTTLE_SCOPE
+  }
+
+  public enum ThrottleScope {
+    CLUSTER,
+    RESOURCE,
+    INSTANCE,
+    PARTITION
+  }
+
+  public enum RebalanceType {
+    LOAD_BALANCE,
+    RECOVERY_BALANCE,
+    ANY
+  }
+
+  public static class StateTransitionType {
+    final static String ANY_STATE = "*";
+    final static String FROM_KEY = "from";
+    final static String TO_KEY = "to";
+    String _fromState;
+    String _toState;
+
+    StateTransitionType(String fromState, String toState) {
+      _fromState = fromState;
+      _toState = toState;
+    }
+
+    @Override
+    public String toString() {
+      return FROM_KEY + "." + _fromState + "." + TO_KEY + "." + _toState;
+    }
+
+    public static StateTransitionType parseFromString(String stateTransTypeStr) {
+      String states[] = stateTransTypeStr.split(".");
+      if (states.length < 4 || !states[0].equalsIgnoreCase(FROM_KEY) || !states[2]
+          .equalsIgnoreCase(TO_KEY)) {
+        return null;
+      }
+      return new StateTransitionType(states[1], states[3]);
+    }
+  }
+
+  private ThrottleScope _throttleScope;
+  private RebalanceType _rebalanceType;
+  private Map<StateTransitionType, Long> _maxPendingStateTransitionMap;
+
+  public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope) {
+    _rebalanceType = rebalanceType;
+    _throttleScope = throttleScope;
+    _maxPendingStateTransitionMap = new HashMap<StateTransitionType, Long>();
+  }
+
+  /**
+   * Add a max pending transition from given from state to the specified to state.
+   *
+   * @param fromState
+   * @param toState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(String fromState, String toState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(fromState, toState), maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from ANY state to ANY state.
+   *
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(StateTransitionType.ANY_STATE, StateTransitionType.ANY_STATE),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition for a given state transition type.
+   *
+   * @param stateTransitionType
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(StateTransitionType stateTransitionType,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap.put(stateTransitionType, maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from ANY state to the specified state.
+   *
+   * @param toState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottleFromAnyState(String toState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(StateTransitionType.ANY_STATE, toState),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from given state to ANY state.
+   *
+   * @param fromState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottleToAnyState(String fromState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(fromState, StateTransitionType.ANY_STATE),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Generate the JSON String for StateTransitionThrottleConfig.
+   *
+   * @return Json String for this config.
+   */
+  public String toJSON() {
+    Map<String, String> configsMap = new HashMap<String, String>();
+
+    configsMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
+    configsMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
+
+    for (Map.Entry<StateTransitionType, Long> e : _maxPendingStateTransitionMap.entrySet()) {
+      configsMap.put(e.getKey().toString(), String.valueOf(e.getValue()));
+    }
+
+    String jsonStr = null;
+    try {
+      ObjectWriter objectWriter = OBJECT_MAPPER.writer();
+      jsonStr = objectWriter.writeValueAsString(configsMap);
+    } catch (IOException e) {
+      logger.error("Failed to convert config map to JSON object! " + configsMap);
+    }
+
+    return jsonStr;
+  }
+
+  /**
+   * Instantiate a throttle config from a config JSON string.
+   *
+   * @param configJsonStr
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   */
+  public static StateTransitionThrottleConfig fromJSON(String configJsonStr) {
+    StateTransitionThrottleConfig throttleConfig = null;
+    try {
+      ObjectReader objectReader = OBJECT_MAPPER.reader(Map.class);
+      Map<String, String> configsMap = objectReader.readValue(configJsonStr);
+      throttleConfig = fromConfigMap(configsMap);
+    } catch (IOException e) {
+      logger.error("Failed to convert JSON string to config map! " + configJsonStr);
+    }
+
+    return throttleConfig;
+  }
+
+
+  /**
+   * Instantiate a throttle config from a config map
+   *
+   * @param configsMap
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   */
+  public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
+    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) ||
+        !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+      // not a valid StateTransitionThrottleConfig
+      return null;
+    }
+
+    StateTransitionThrottleConfig config;
+    try {
+      RebalanceType rebalanceType =
+          RebalanceType.valueOf(configsMap.get(ConfigProperty.REBALANCE_TYPE.name()));
+      ThrottleScope throttleScope =
+          ThrottleScope.valueOf(configsMap.get(ConfigProperty.THROTTLE_SCOPE.name()));
+      config = new StateTransitionThrottleConfig(rebalanceType, throttleScope);
+    } catch (IllegalArgumentException ex) {
+      return null;
+    }
+
+    for (String configKey : configsMap.keySet()) {
+      StateTransitionType transitionType = StateTransitionType.parseFromString(configKey);
+      if (transitionType != null) {
+        try {
+          long value = Long.valueOf(configsMap.get(configKey));
+          config.addThrottle(transitionType, value);
+        } catch (NumberFormatException ex) {
+          // ignore the config item with invalid number.
+          logger.warn(String.format("Invalid config entry, key=%s, value=%s", configKey,
+              configsMap.get(configKey)));
+        }
+      }
+    }
+
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 6aaa2b7..f679b3f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,8 +19,13 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+
 
 /**
  * Cluster configurations
@@ -36,6 +41,7 @@ public class ClusterConfig extends HelixProperty {
     FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
     DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold until rebalancing.
+    STATE_TRANSITION_THROTTLE_CONFIGS,
     BATCH_STATE_TRANSITION_MAX_THREADS,
     MAX_CONCURRENT_TASK_PER_INSTANCE
   }
@@ -142,6 +148,52 @@ public class ClusterConfig extends HelixProperty {
     return false;
   }
 
+  /**
+   * Get a list StateTransitionThrottleConfig set for this cluster.
+   *
+   * @return
+   */
+  public List<StateTransitionThrottleConfig> getStateTransitionThrottleConfigs() {
+    List<String> configs =
+        _record.getListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name());
+    if (configs == null || configs.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<StateTransitionThrottleConfig> throttleConfigs =
+        new ArrayList<StateTransitionThrottleConfig>();
+    for (String configstr : configs) {
+      StateTransitionThrottleConfig throttleConfig =
+          StateTransitionThrottleConfig.fromJSON(configstr);
+      if (throttleConfig != null) {
+        throttleConfigs.add(throttleConfig);
+      }
+    }
+
+    return throttleConfigs;
+  }
+
+  /**
+   * Set StateTransitionThrottleConfig for this cluster.
+   *
+   * @param throttleConfigs
+   */
+  public void setStateTransitionThrottleConfigs(
+      List<StateTransitionThrottleConfig> throttleConfigs) {
+    List<String> configStrs = new ArrayList<String>();
+
+    for (StateTransitionThrottleConfig throttleConfig : throttleConfigs) {
+      String configStr = throttleConfig.toJSON();
+      if (configStr != null) {
+        configStrs.add(configStr);
+      }
+    }
+
+    if (!configStrs.isEmpty()) {
+      _record
+          .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
+    }
+  }
+
   @Override
   public int hashCode() {
     return getId().hashCode();

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index b195623..0cb8f2a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -136,7 +136,7 @@ public class ResourceConfig extends HelixProperty {
     }
 
     if (rebalanceConfig != null) {
-      putSimpleConfigs(rebalanceConfig.getSimpleFieldsMap());
+      putSimpleConfigs(rebalanceConfig.getConfigsMap());
     }
   }