You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/26 20:55:39 UTC

[1/3] helix git commit: Add integration test cases to test crush rebalance strategy for non-rackaware clusters.

Repository: helix
Updated Branches:
  refs/heads/master 1c855ae85 -> 79ebc0469


Add integration test cases to test crush rebalance strategy for non-rackaware clusters.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/de1a27f6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/de1a27f6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/de1a27f6

Branch: refs/heads/master
Commit: de1a27f6cf13c190275f2544c6c9d4afe78d5a82
Parents: 1c855ae
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Sep 25 17:08:04 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 17:08:04 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixAdmin.java  |   4 +-
 .../org/apache/helix/model/InstanceConfig.java  |  43 ++++
 .../org/apache/helix/tools/ClusterSetup.java    |  42 +---
 .../TestCrushAutoRebalanceNonRack.java          | 216 +++++++++++++++++++
 4 files changed, 263 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 75cbfcf..8f47ea5 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -445,14 +445,14 @@ public interface HelixAdmin {
 
   /**
    * @param clusterName
-   * @param instanceNames
+   * @param instanceName
    * @param tag
    */
   void addInstanceTag(String clusterName, String instanceName, String tag);
 
   /**
    * @param clusterName
-   * @param instanceNames
+   * @param instanceName
    * @param tag
    */
   void removeInstanceTag(String clusterName, String instanceName, String tag);

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 6002591..1a80e70 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -452,4 +452,47 @@ public class InstanceConfig extends HelixProperty {
     // HELIX-65: remove check for hostname/port existence
     return true;
   }
+
+  /**
+   * Create InstanceConfig with given instanceId, instanceId should be in format of host:port
+   * @param instanceId
+   * @return
+   */
+  public static InstanceConfig toInstanceConfig(String instanceId) {
+    String host = null;
+    int port = -1;
+    // to maintain backward compatibility we parse string of format host:port
+    // and host_port, where host port must be of type string and int
+    char[] delims = new char[] {
+        ':', '_'
+    };
+    for (char delim : delims) {
+      String regex = String.format("(.*)[%c]([\\d]+)", delim);
+      if (instanceId.matches(regex)) {
+        int lastIndexOf = instanceId.lastIndexOf(delim);
+        try {
+          port = Integer.parseInt(instanceId.substring(lastIndexOf + 1));
+          host = instanceId.substring(0, lastIndexOf);
+        } catch (Exception e) {
+          _logger.warn("Unable to extract host and port from instanceId:" + instanceId);
+        }
+        break;
+      }
+    }
+    if (host != null && port > 0) {
+      instanceId = host + "_" + port;
+    }
+    InstanceConfig config = new InstanceConfig(instanceId);
+    if (host != null && port > 0) {
+      config.setHostName(host);
+      config.setPort(String.valueOf(port));
+
+    }
+
+    config.setInstanceEnabled(true);
+    if (config.getHostName() == null) {
+      config.setHostName(instanceId);
+    }
+    return config;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index e79410d..03070b2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -178,46 +178,8 @@ public class ClusterSetup {
     }
   }
 
-  private InstanceConfig toInstanceConfig(String instanceId) {
-    String host = null;
-    int port = -1;
-    // to maintain backward compatibility we parse string of format host:port
-    // and host_port, where host port must be of type string and int
-    char[] delims = new char[] {
-        ':', '_'
-    };
-    for (char delim : delims) {
-      String regex = String.format("(.*)[%c]([\\d]+)", delim);
-      if (instanceId.matches(regex)) {
-        int lastIndexOf = instanceId.lastIndexOf(delim);
-        try {
-          port = Integer.parseInt(instanceId.substring(lastIndexOf + 1));
-          host = instanceId.substring(0, lastIndexOf);
-        } catch (Exception e) {
-          _logger.warn("Unable to extract host and port from instanceId:" + instanceId);
-        }
-        break;
-      }
-    }
-    if (host != null && port > 0) {
-      instanceId = host + "_" + port;
-    }
-    InstanceConfig config = new InstanceConfig(instanceId);
-    if (host != null && port > 0) {
-      config.setHostName(host);
-      config.setPort(String.valueOf(port));
-
-    }
-
-    config.setInstanceEnabled(true);
-    if (config.getHostName() == null) {
-      config.setHostName(instanceId);
-    }
-    return config;
-  }
-
   public void addInstanceToCluster(String clusterName, String instanceId) {
-    InstanceConfig config = toInstanceConfig(instanceId);
+    InstanceConfig config = InstanceConfig.toInstanceConfig(instanceId);
     _admin.addInstance(clusterName, config);
   }
 
@@ -238,7 +200,7 @@ public class ClusterSetup {
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    InstanceConfig instanceConfig = toInstanceConfig(instanceId);
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId);
     instanceId = instanceConfig.getInstanceName();
 
     // ensure node is stopped

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
new file mode 100644
index 0000000..9672483
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
@@ -0,0 +1,216 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCrushAutoRebalanceNonRack extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  List<String> _allDBs = new ArrayList<String>();
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+      HelixConfigScope clusterScope =
+          new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+              .forCluster(CLUSTER_NAME).build();
+
+    Map<String, String> configs = new HashMap<String, String>();
+    configs.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/instance");
+    configs.put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "instance");
+    configAccessor.set(clusterScope, configs);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _nodes.add(storageNodeName);
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+      _nodeToTagMap.put(storageNodeName, tag);
+      HelixConfigScope instanceScope =
+          new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+              .forCluster(CLUSTER_NAME).forParticipant(storageNodeName).build();
+      configAccessor
+          .set(instanceScope, InstanceConfig.InstanceConfigProperty.DOMAIN.name(), "instance=" + storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String [][] rebalanceStrategies() {
+    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void test(String rebalanceStrategyName, String rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("Test " + rebalanceStrategyName);
+    List<String> testDBs = new ArrayList<String>();
+    String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+        BuiltInStateModelDefinitions.MasterSlave.name(),
+        BuiltInStateModelDefinitions.LeaderStandby.name()
+    };
+    int i = 0;
+    for (String stateModel : testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void testWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception {
+    List<String> testDBs = new ArrayList<String>();
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev);
+    }
+  }
+
+  /**
+   * Validate each partition is different instances and with necessary tagged instances.
+   */
+  private void validateIsolation(IdealState is, ExternalView ev) {
+    int replica = Integer.valueOf(is.getReplicas());
+    String tag = is.getInstanceGroupTag();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV.size(), replica);
+      for (String instance : instancesInEV) {
+        if (tag != null) {
+          InstanceConfig config =
+              _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}


[2/3] helix git commit: Add StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

Posted by jx...@apache.org.
Add StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

Introduce StateTransitionThrottleConfig class to allow client to specify different types of state transition throttle control in cluster level config.

More design details at: https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Helix+Partition+Movement+Throttling


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d51f3537
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d51f3537
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d51f3537

Branch: refs/heads/master
Commit: d51f35377ad23ee881503b71b09236a39dba352c
Parents: de1a27f
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Sep 25 17:12:46 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 17:12:46 2017 -0700

----------------------------------------------------------------------
 .../helix/api/config/RebalanceConfig.java       |   4 +-
 .../config/StateTransitionThrottleConfig.java   | 250 +++++++++++++++++++
 .../org/apache/helix/model/ClusterConfig.java   |  52 ++++
 .../org/apache/helix/model/ResourceConfig.java  |   2 +-
 4 files changed, 305 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
index 31f6d3b..36766fd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/RebalanceConfig.java
@@ -182,11 +182,11 @@ public class RebalanceConfig {
   }
 
   /**
-   * Generate the simple field map for RebalanceConfig.
+   * Generate the config map for RebalanceConfig.
    *
    * @return
    */
-  public Map<String, String> getSimpleFieldsMap() {
+  public Map<String, String> getConfigsMap() {
     Map<String, String> simpleFieldMap = new HashMap<String, String>();
 
     if (_rebalanceDelay >= 0) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
new file mode 100644
index 0000000..39bd458
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -0,0 +1,250 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public class StateTransitionThrottleConfig {
+  private static final Logger logger =
+      Logger.getLogger(StateTransitionThrottleConfig.class.getName());
+
+  private enum ConfigProperty {
+    CONFIG_TYPE,
+    REBALANCE_TYPE,
+    THROTTLE_SCOPE
+  }
+
+  public enum ThrottleScope {
+    CLUSTER,
+    RESOURCE,
+    INSTANCE,
+    PARTITION
+  }
+
+  public enum RebalanceType {
+    LOAD_BALANCE,
+    RECOVERY_BALANCE,
+    ANY
+  }
+
+  public static class StateTransitionType {
+    final static String ANY_STATE = "*";
+    final static String FROM_KEY = "from";
+    final static String TO_KEY = "to";
+    String _fromState;
+    String _toState;
+
+    StateTransitionType(String fromState, String toState) {
+      _fromState = fromState;
+      _toState = toState;
+    }
+
+    @Override
+    public String toString() {
+      return FROM_KEY + "." + _fromState + "." + TO_KEY + "." + _toState;
+    }
+
+    public static StateTransitionType parseFromString(String stateTransTypeStr) {
+      String states[] = stateTransTypeStr.split(".");
+      if (states.length < 4 || !states[0].equalsIgnoreCase(FROM_KEY) || !states[2]
+          .equalsIgnoreCase(TO_KEY)) {
+        return null;
+      }
+      return new StateTransitionType(states[1], states[3]);
+    }
+  }
+
+  private ThrottleScope _throttleScope;
+  private RebalanceType _rebalanceType;
+  private Map<StateTransitionType, Long> _maxPendingStateTransitionMap;
+
+  public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope) {
+    _rebalanceType = rebalanceType;
+    _throttleScope = throttleScope;
+    _maxPendingStateTransitionMap = new HashMap<StateTransitionType, Long>();
+  }
+
+  /**
+   * Add a max pending transition from given from state to the specified to state.
+   *
+   * @param fromState
+   * @param toState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(String fromState, String toState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(fromState, toState), maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from ANY state to ANY state.
+   *
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(StateTransitionType.ANY_STATE, StateTransitionType.ANY_STATE),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition for a given state transition type.
+   *
+   * @param stateTransitionType
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottle(StateTransitionType stateTransitionType,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap.put(stateTransitionType, maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from ANY state to the specified state.
+   *
+   * @param toState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottleFromAnyState(String toState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(StateTransitionType.ANY_STATE, toState),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  /**
+   * Add a max pending transition from given state to ANY state.
+   *
+   * @param fromState
+   * @param maxPendingStateTransition
+   * @return
+   */
+  public StateTransitionThrottleConfig addThrottleToAnyState(String fromState,
+      long maxPendingStateTransition) {
+    _maxPendingStateTransitionMap
+        .put(new StateTransitionType(fromState, StateTransitionType.ANY_STATE),
+            maxPendingStateTransition);
+    return this;
+  }
+
+  private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Generate the JSON String for StateTransitionThrottleConfig.
+   *
+   * @return Json String for this config.
+   */
+  public String toJSON() {
+    Map<String, String> configsMap = new HashMap<String, String>();
+
+    configsMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
+    configsMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
+
+    for (Map.Entry<StateTransitionType, Long> e : _maxPendingStateTransitionMap.entrySet()) {
+      configsMap.put(e.getKey().toString(), String.valueOf(e.getValue()));
+    }
+
+    String jsonStr = null;
+    try {
+      ObjectWriter objectWriter = OBJECT_MAPPER.writer();
+      jsonStr = objectWriter.writeValueAsString(configsMap);
+    } catch (IOException e) {
+      logger.error("Failed to convert config map to JSON object! " + configsMap);
+    }
+
+    return jsonStr;
+  }
+
+  /**
+   * Instantiate a throttle config from a config JSON string.
+   *
+   * @param configJsonStr
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   */
+  public static StateTransitionThrottleConfig fromJSON(String configJsonStr) {
+    StateTransitionThrottleConfig throttleConfig = null;
+    try {
+      ObjectReader objectReader = OBJECT_MAPPER.reader(Map.class);
+      Map<String, String> configsMap = objectReader.readValue(configJsonStr);
+      throttleConfig = fromConfigMap(configsMap);
+    } catch (IOException e) {
+      logger.error("Failed to convert JSON string to config map! " + configJsonStr);
+    }
+
+    return throttleConfig;
+  }
+
+
+  /**
+   * Instantiate a throttle config from a config map
+   *
+   * @param configsMap
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   */
+  public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
+    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) ||
+        !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+      // not a valid StateTransitionThrottleConfig
+      return null;
+    }
+
+    StateTransitionThrottleConfig config;
+    try {
+      RebalanceType rebalanceType =
+          RebalanceType.valueOf(configsMap.get(ConfigProperty.REBALANCE_TYPE.name()));
+      ThrottleScope throttleScope =
+          ThrottleScope.valueOf(configsMap.get(ConfigProperty.THROTTLE_SCOPE.name()));
+      config = new StateTransitionThrottleConfig(rebalanceType, throttleScope);
+    } catch (IllegalArgumentException ex) {
+      return null;
+    }
+
+    for (String configKey : configsMap.keySet()) {
+      StateTransitionType transitionType = StateTransitionType.parseFromString(configKey);
+      if (transitionType != null) {
+        try {
+          long value = Long.valueOf(configsMap.get(configKey));
+          config.addThrottle(transitionType, value);
+        } catch (NumberFormatException ex) {
+          // ignore the config item with invalid number.
+          logger.warn(String.format("Invalid config entry, key=%s, value=%s", configKey,
+              configsMap.get(configKey)));
+        }
+      }
+    }
+
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 6aaa2b7..f679b3f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,8 +19,13 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+
 
 /**
  * Cluster configurations
@@ -36,6 +41,7 @@ public class ClusterConfig extends HelixProperty {
     FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
     DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold until rebalancing.
+    STATE_TRANSITION_THROTTLE_CONFIGS,
     BATCH_STATE_TRANSITION_MAX_THREADS,
     MAX_CONCURRENT_TASK_PER_INSTANCE
   }
@@ -142,6 +148,52 @@ public class ClusterConfig extends HelixProperty {
     return false;
   }
 
+  /**
+   * Get a list StateTransitionThrottleConfig set for this cluster.
+   *
+   * @return
+   */
+  public List<StateTransitionThrottleConfig> getStateTransitionThrottleConfigs() {
+    List<String> configs =
+        _record.getListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name());
+    if (configs == null || configs.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<StateTransitionThrottleConfig> throttleConfigs =
+        new ArrayList<StateTransitionThrottleConfig>();
+    for (String configstr : configs) {
+      StateTransitionThrottleConfig throttleConfig =
+          StateTransitionThrottleConfig.fromJSON(configstr);
+      if (throttleConfig != null) {
+        throttleConfigs.add(throttleConfig);
+      }
+    }
+
+    return throttleConfigs;
+  }
+
+  /**
+   * Set StateTransitionThrottleConfig for this cluster.
+   *
+   * @param throttleConfigs
+   */
+  public void setStateTransitionThrottleConfigs(
+      List<StateTransitionThrottleConfig> throttleConfigs) {
+    List<String> configStrs = new ArrayList<String>();
+
+    for (StateTransitionThrottleConfig throttleConfig : throttleConfigs) {
+      String configStr = throttleConfig.toJSON();
+      if (configStr != null) {
+        configStrs.add(configStr);
+      }
+    }
+
+    if (!configStrs.isEmpty()) {
+      _record
+          .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
+    }
+  }
+
   @Override
   public int hashCode() {
     return getId().hashCode();

http://git-wip-us.apache.org/repos/asf/helix/blob/d51f3537/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index b195623..0cb8f2a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -136,7 +136,7 @@ public class ResourceConfig extends HelixProperty {
     }
 
     if (rebalanceConfig != null) {
-      putSimpleConfigs(rebalanceConfig.getSimpleFieldsMap());
+      putSimpleConfigs(rebalanceConfig.getConfigsMap());
     }
   }
 


[3/3] helix git commit: Add protective check for ZKHelixAdmin.dropInstance()

Posted by jx...@apache.org.
Add protective check for ZKHelixAdmin.dropInstance()

Dropping an instance when it is still alive could result in inconsistent state and should not be allowed. Add protective check and throw exception when this happens.
Add test for this check. Start a process to make the instance alive, try dropping the instance(fail as expected), then stop the process and drop the instance as usual.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79ebc046
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79ebc046
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79ebc046

Branch: refs/heads/master
Commit: 79ebc0469da73a7a1b2d8c73a42efc4001d06088
Parents: d51f353
Author: Weihan Kong <wk...@linkedin.com>
Authored: Mon Sep 25 17:59:00 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Sep 25 17:59:00 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  19 +--
 .../helix/manager/zk/TestZkHelixAdmin.java      | 122 +++++++++----------
 2 files changed, 68 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/79ebc046/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 351c10e..19a4193 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -121,24 +121,27 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
-    String instanceConfigsPath =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString());
-    String nodeId = instanceConfig.getId();
-    String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
-    String instancePath = PropertyPathBuilder.instance(clusterName, nodeId);
+    String instanceName = instanceConfig.getInstanceName();
 
+    String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
     if (!_zkClient.exists(instanceConfigPath)) {
-      throw new HelixException("Node " + nodeId + " does not exist in config for cluster "
+      throw new HelixException("Node " + instanceName + " does not exist in config for cluster "
           + clusterName);
     }
 
+    String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
     if (!_zkClient.exists(instancePath)) {
-      throw new HelixException("Node " + nodeId + " does not exist in instances for cluster "
+      throw new HelixException("Node " + instanceName + " does not exist in instances for cluster "
           + clusterName);
     }
 
+    String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);
+    if (_zkClient.exists(liveInstancePath)) {
+      throw new HelixException("Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
+    }
+
     // delete config path
+    String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
     ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
 
     // delete instance path

http://git-wip-us.apache.org/repos/asf/helix/blob/79ebc046/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index a431171..236a5b2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -29,12 +29,16 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -47,6 +51,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -54,8 +59,8 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestZkHelixAdmin extends ZkUnitTestBase {
-  @Test()
   public void testZkHelixAdmin() {
+    //TODO refactor this test into small test cases and use @before annotations
     System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
 
     final String clusterName = getShortClassName();
@@ -85,19 +90,12 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
       // OK
     }
 
-    String hostname = "host1";
-    String port = "9999";
-    String instanceName = hostname + "_" + port;
-    InstanceConfig config = new InstanceConfig(instanceName);
-    config.setHostName(hostname);
-    config.setPort(port);
-    List<String> dummyList = new ArrayList<String>();
-    dummyList.add("foo");
-    dummyList.add("bar");
-    config.getRecord().setListField("dummy", dummyList);
+    InstanceConfig config = new InstanceConfig("host1_9999");
+    config.setHostName("host1");
+    config.setPort("9999");
     tool.addInstance(clusterName, config);
-    tool.enableInstance(clusterName, instanceName, true);
-    String path = PropertyPathBuilder.getPath(PropertyType.INSTANCES, clusterName, instanceName);
+    tool.enableInstance(clusterName, "host1_9999", true);
+    String path = PropertyPathBuilder.instance(clusterName, "host1_9999");
     AssertJUnit.assertTrue(_gZkClient.exists(path));
 
     try {
@@ -106,43 +104,32 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     } catch (HelixException e) {
       // OK
     }
-    config = tool.getInstanceConfig(clusterName, instanceName);
-    AssertJUnit.assertEquals(config.getId(), instanceName);
+    config = tool.getInstanceConfig(clusterName, "host1_9999");
+    AssertJUnit.assertEquals(config.getId(), "host1_9999");
 
-    // test setInstanceConfig()
-    config = tool.getInstanceConfig(clusterName, instanceName);
-    config.setHostName("host2");
+    // test: should not drop instance when it is still alive
+    HelixManager manager = initializeHelixManager(clusterName, config.getInstanceName(), ZK_ADDR, "id1");
     try {
-      // different host
-      tool.setInstanceConfig(clusterName, instanceName, config);
-      Assert.fail("should fail if hostname is different from the current one");
-    } catch (HelixException e) {
-      // OK
+      manager.connect();
+    } catch (Exception e) {
+      Assert.fail("HelixManager failed connecting");
     }
 
-    config = tool.getInstanceConfig(clusterName, instanceName);
-    config.setPort("7777");
     try {
-      // different port
-      tool.setInstanceConfig(clusterName, instanceName, config);
-      Assert.fail("should fail if port is different from the current one");
+      tool.dropInstance(clusterName, config);
+      Assert.fail("should fail if an instance is still alive");
     } catch (HelixException e) {
       // OK
     }
 
-    dummyList.remove("bar");
-    dummyList.add("baz");
-    config = tool.getInstanceConfig(clusterName, instanceName);
-    config.getRecord().setListField("dummy", dummyList);
-    AssertJUnit.assertTrue(tool.setInstanceConfig(clusterName, "host1_9999", config));
-    config = tool.getInstanceConfig(clusterName, "host1_9999");
-    dummyList = config.getRecord().getListField("dummy");
-    AssertJUnit.assertTrue(dummyList.contains("foo"));
-    AssertJUnit.assertTrue(dummyList.contains("baz"));
-    AssertJUnit.assertFalse(dummyList.contains("bar"));
-    AssertJUnit.assertEquals(2, dummyList.size());
+    try {
+      manager.disconnect();
+    } catch (Exception e) {
+      Assert.fail("HelixManager failed disconnecting");
+    }
+
+    tool.dropInstance(clusterName, config); // correctly drop the instance
 
-    tool.dropInstance(clusterName, config);
     try {
       tool.getInstanceConfig(clusterName, "host1_9999");
       Assert.fail("should fail if get a non-existent instance");
@@ -164,7 +151,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     ZNRecord stateModelRecord = new ZNRecord("id1");
     try {
       tool.addStateModelDef(clusterName, "id1", new StateModelDefinition(stateModelRecord));
-      path = PropertyPathBuilder.getPath(PropertyType.STATEMODELDEFS, clusterName, "id1");
+      path = PropertyPathBuilder.stateModelDef(clusterName, "id1");
       AssertJUnit.assertTrue(_gZkClient.exists(path));
       Assert.fail("should fail");
     } catch (HelixException e) {
@@ -233,6 +220,18 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis()));
   }
 
+  private HelixManager initializeHelixManager(String clusterName, String instanceName, String zkAddress,
+      String stateModelName) {
+    HelixManager manager =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
+
+    MasterSlaveStateModelFactory stateModelFactory = new MasterSlaveStateModelFactory(instanceName);
+
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory(stateModelName, stateModelFactory);
+    return manager;
+  }
+
   // drop resource should drop corresponding resource-level config also
   @Test
   public void testDropResource() {
@@ -246,13 +245,13 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
-    tool.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
-        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    tool.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
     tool.addResource(clusterName, "test-db", 4, "MasterSlave");
     Map<String, String> resourceConfig = new HashMap<String, String>();
     resourceConfig.put("key1", "value1");
-    tool.setConfig(new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
-        .forCluster(clusterName).forResource("test-db").build(), resourceConfig);
+    tool.setConfig(new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
+        .forResource("test-db")
+        .build(), resourceConfig);
 
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     Assert.assertTrue(_gZkClient.exists(keyBuilder.idealStates("test-db").getPath()),
@@ -283,8 +282,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
     // test admin.getMessageConstraints()
-    ClusterConstraints constraints =
-        tool.getConstraints(clusterName, ConstraintType.MESSAGE_CONSTRAINT);
+    ClusterConstraints constraints = tool.getConstraints(clusterName, ConstraintType.MESSAGE_CONSTRAINT);
     Assert.assertNull(constraints, "message-constraint should NOT exist for cluster: " + className);
 
     // remove non-exist constraint
@@ -299,14 +297,11 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     ConstraintItemBuilder builder = new ConstraintItemBuilder();
     builder.addConstraintAttribute(ConstraintAttribute.RESOURCE.toString(), "MyDB")
         .addConstraintAttribute(ConstraintAttribute.CONSTRAINT_VALUE.toString(), "1");
-    tool.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1",
-        builder.build());
+    tool.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1", builder.build());
 
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
-    constraints =
-        accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
+    constraints = accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
     Assert.assertNotNull(constraints, "message-constraint should exist");
     ConstraintItem item = constraints.getConstraintItem("constraint1");
     Assert.assertNotNull(item, "message-constraint for constraint1 should exist");
@@ -323,8 +318,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     // remove a exist message-constraint
     tool.removeConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-    constraints =
-        accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
+    constraints = accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
     Assert.assertNotNull(constraints, "message-constraint should exist");
     item = constraints.getConstraintItem("constraint1");
     Assert.assertNull(item, "message-constraint for constraint1 should NOT exist");
@@ -342,8 +336,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     admin.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
     String resourceName = "TestDB";
-    admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
-        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
     admin.addResource(clusterName, resourceName, 4, "MasterSlave");
     admin.enableResource(clusterName, resourceName, false);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
@@ -408,6 +401,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     AssertJUnit.assertEquals(allResources.size(), 4);
     AssertJUnit.assertEquals(resourcesWithTag.size(), 2);
   }
+
   @Test
   public void testEnableDisablePartitions() {
     String className = TestHelper.getTestClassName();
@@ -441,20 +435,18 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     String instanceName = "TestInstanceLegacy";
     String testResourcePrefix = "TestResourceLegacy";
     ZNRecord record = new ZNRecord(instanceName);
-    List<String> disabledPartitions =
-        new ArrayList<String>(Arrays.asList(new String[] { "1", "2", "3" }));
-    record.setListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(),
-        disabledPartitions);
+    List<String> disabledPartitions = new ArrayList<String>(Arrays.asList(new String[]{"1", "2", "3"}));
+    record.setListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(), disabledPartitions);
     InstanceConfig instanceConfig = new InstanceConfig(record);
     instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", false);
     Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(), 3);
     Assert.assertEquals(instanceConfig.getRecord()
-            .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
-        3);
+        .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+        .size(), 3);
     instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", true);
     Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(), 2);
     Assert.assertEquals(instanceConfig.getRecord()
-            .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
-        2);
+        .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+        .size(), 2);
   }
 }