You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:44 UTC

[5/5] helix git commit: [HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode

[HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bc0aa76a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bc0aa76a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bc0aa76a

Branch: refs/heads/helix-0.6.x
Commit: bc0aa76a9de6243928e53e1a1d01e7502ff8267c
Parents: f5ac8f8
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 31 19:17:39 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   |   3 +
 .../util/ConstraintBasedAssignment.java         |  22 +--
 .../controller/stages/ClusterDataCache.java     |  16 +++
 .../TestAutoRebalanceWithDisabledInstance.java  | 142 +++++++++++++++++++
 .../integration/TestStateTransitionTimeout.java |  28 ----
 .../integration/ZkStandAloneCMTestBase.java     |   2 +
 .../mock/participant/MockMSStateModel.java      |  65 +--------
 7 files changed, 180 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index a8d83a2..e47297f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,6 +82,9 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
     stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
     List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+    allNodes.removeAll(clusterData.getDisabledInstances());
+    liveNodes.retainAll(allNodes);
+
     Map<String, Map<String, String>> currentMapping =
         currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a520803..9366bcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -75,24 +75,26 @@ public class ConstraintBasedAssignment {
       boolean isResourceEnabled) {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
-    // if the ideal state is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
-        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
-          // if dropped (whether disabled or not), transit to DROPPED
+        if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) {
+          // The partition is dropped from preference list.
+          // Transit to DROPPED no matter the instance is disabled or not.
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.name()))
-            && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
+        } else {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) {
+            if (currentStateMap.get(instance) == null || !currentStateMap.get(instance)
+                .equals(HelixDefinedState.ERROR.name())) {
+              instanceStateMap.put(instance, stateModelDef.getInitialState());
+            }
+          }
         }
       }
     }
 
-    // ideal state is deleted
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
     if (instancePreferenceList == null) {
       return instanceStateMap;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index b77ce0d..cb5bda8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -390,6 +390,22 @@ public class ClusterDataCache {
     return disabledInstancesSet;
   }
 
+
+  /**
+   * This method allows one to fetch the set of nodes that are disabled
+   * @return
+   */
+  public Set<String> getDisabledInstances() {
+    Set<String> disabledInstancesSet = new HashSet<String>();
+    for (String instance : _instanceConfigMap.keySet()) {
+      InstanceConfig config = _instanceConfigMap.get(instance);
+      if (config.getInstanceEnabled() == false) {
+        disabledInstancesSet.add(instance);
+      }
+    }
+    return disabledInstancesSet;
+  }
+
   /**
    * Returns the number of replicas for a given resource.
    * @param resourceName

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..84eca6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
@@ -0,0 +1,142 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase {
+  private static String TEST_DB_2 = "TestDB2";
+
+  @BeforeClass
+  @Override
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL,
+        RebalanceMode.FULL_AUTO + "");
+    _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica);
+
+    Thread.sleep(200);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @Test()
+  public void testDisableEnableInstanceAutoRebalance() throws Exception {
+    String disabledInstance = _participants[0].getInstanceName();
+
+    Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2,
+        disabledInstance);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+        disabledInstance);
+    Assert.assertFalse(currentPartitions.isEmpty());
+
+    // disable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertTrue(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertTrue(currentPartitions.isEmpty());
+
+    //enable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertFalse(currentPartitions.isEmpty());
+  }
+
+  @Test()
+  public void testAddDisabledInstanceAutoRebalance() throws Exception {
+    // add disabled instance.
+    String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR);
+    _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName);
+    MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName);
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false);
+
+    participant.syncStart();
+
+    Thread.sleep(400);
+    Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertTrue(assignedPartitions.isEmpty());
+    Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+        nodeName);
+    Assert.assertTrue(currentPartitions.isEmpty());
+
+    //enable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertFalse(currentPartitions.isEmpty());
+  }
+
+  private Set<String> getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) {
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    Set<String> partitionSet = new HashSet<String>();
+    IdealState is = admin.getResourceIdealState(cluster, dbName);
+    for (String partition : is.getRecord().getListFields().keySet()) {
+      List<String> assignments = is.getRecord().getListField(partition);
+      for (String ins : assignments) {
+        if (ins.equals(instance)) {
+          partitionSet.add(partition);
+        }
+      }
+    }
+
+    return partitionSet;
+  }
+
+  private Set<String> getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) {
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    Set<String> partitionSet = new HashSet<String>();
+
+    ExternalView ev = admin.getResourceExternalView(cluster, dbName);
+    for (String partition : ev.getRecord().getMapFields().keySet()) {
+      Map<String, String> assignments = ev.getRecord().getMapField(partition);
+      for (String ins : assignments.keySet()) {
+        if (ins.equals(instance)) {
+          partitionSet.add(partition);
+        }
+      }
+    }
+    return partitionSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index 443d484..fb534fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -99,14 +99,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       _sleep = sleep;
     }
 
-    @Override
-    @Transition(to = "SLAVE", from = "OFFLINE")
-    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
-      LOG.info("Become SLAVE from OFFLINE");
-
-    }
-
-    @Override
     @Transition(to = "MASTER", from = "SLAVE")
     public void onBecomeMasterFromSlave(Message message, NotificationContext context)
         throws InterruptedException {
@@ -117,26 +109,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    @Transition(to = "SLAVE", from = "MASTER")
-    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
-      LOG.info("Become SLAVE from MASTER");
-    }
-
-    @Override
-    @Transition(to = "OFFLINE", from = "SLAVE")
-    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
-      LOG.info("Become OFFLINE from SLAVE");
-
-    }
-
-    @Override
-    @Transition(to = "DROPPED", from = "OFFLINE")
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-      LOG.info("Become DROPPED from OFFLINE");
-
-    }
-
-    @Override
     public void rollbackOnError(Message message, NotificationContext context,
         StateTransitionError error) {
       _error = error;

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 5d169d5..f694618 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -91,6 +91,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
         ClusterStateVerifier
             .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
 
+    Assert.assertTrue(result);
+
     result =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             CLUSTER_NAME));

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
index 61733ba..7d90063 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
@@ -43,67 +43,12 @@ public class MockMSStateModel extends StateModel {
     _transition = transition;
   }
 
-  // overwrite default error->dropped transition
-  @Transition(to = "DROPPED", from = "ERROR")
-  public void onBecomeDroppedFromError(Message message, NotificationContext context)
+  @Transition(to = "*", from = "*")
+  public void generalTransitionHandle(Message message, NotificationContext context)
       throws InterruptedException {
-    LOG.info("Become DROPPED from ERROR");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "SLAVE", from = "OFFLINE")
-  public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become SLAVE from OFFLINE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-
-    }
-  }
-
-  @Transition(to = "MASTER", from = "SLAVE")
-  public void onBecomeMasterFromSlave(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become MASTER from SLAVE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "SLAVE", from = "MASTER")
-  public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become SLAVE from MASTER");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "OFFLINE", from = "SLAVE")
-  public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become OFFLINE from SLAVE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "DROPPED", from = "OFFLINE")
-  public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become DROPPED from OFFLINE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "OFFLINE", from = "ERROR")
-  public void onBecomeOfflineFromError(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become OFFLINE from ERROR");
-    // System.err.println("Become OFFLINE from ERROR");
+    LOG.info(String
+        .format("Resource %s partition %s becomes %s from %s", message.getResourceName(),
+            message.getPartitionName(), message.getToState(), message.getFromState()));
     if (_transition != null) {
       _transition.doTransition(message, context);
     }