You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:44 UTC
[5/5] helix git commit: [HELIX-633] AutoRebalancer should ignore
disabled instance and all partitions on disabled instances should be dropped
in FULL_AUTO rebalance mode
[HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bc0aa76a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bc0aa76a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bc0aa76a
Branch: refs/heads/helix-0.6.x
Commit: bc0aa76a9de6243928e53e1a1d01e7502ff8267c
Parents: f5ac8f8
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 31 19:17:39 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 3 +
.../util/ConstraintBasedAssignment.java | 22 +--
.../controller/stages/ClusterDataCache.java | 16 +++
.../TestAutoRebalanceWithDisabledInstance.java | 142 +++++++++++++++++++
.../integration/TestStateTransitionTimeout.java | 28 ----
.../integration/ZkStandAloneCMTestBase.java | 2 +
.../mock/participant/MockMSStateModel.java | 65 +--------
7 files changed, 180 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index a8d83a2..e47297f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,6 +82,9 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+ allNodes.removeAll(clusterData.getDisabledInstances());
+ liveNodes.retainAll(allNodes);
+
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a520803..9366bcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -75,24 +75,26 @@ public class ConstraintBasedAssignment {
boolean isResourceEnabled) {
Map<String, String> instanceStateMap = new HashMap<String, String>();
- // if the ideal state is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
if (currentStateMap != null) {
for (String instance : currentStateMap.keySet()) {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
- // if dropped (whether disabled or not), transit to DROPPED
+ if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) {
+ // The partition is dropped from preference list.
+ // Transit to DROPPED no matter the instance is disabled or not.
instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.name()))
- && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
+ } else {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
+ if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) {
+ if (currentStateMap.get(instance) == null || !currentStateMap.get(instance)
+ .equals(HelixDefinedState.ERROR.name())) {
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
}
}
}
- // ideal state is deleted
+ // if the ideal state is deleted, instancePreferenceList will be empty and
+ // we should drop all resources.
if (instancePreferenceList == null) {
return instanceStateMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index b77ce0d..cb5bda8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -390,6 +390,22 @@ public class ClusterDataCache {
return disabledInstancesSet;
}
+
+ /**
+ * This method allows one to fetch the set of nodes that are disabled
+ * @return
+ */
+ public Set<String> getDisabledInstances() {
+ Set<String> disabledInstancesSet = new HashSet<String>();
+ for (String instance : _instanceConfigMap.keySet()) {
+ InstanceConfig config = _instanceConfigMap.get(instance);
+ if (config.getInstanceEnabled() == false) {
+ disabledInstancesSet.add(instance);
+ }
+ }
+ return disabledInstancesSet;
+ }
+
/**
* Returns the number of replicas for a given resource.
* @param resourceName
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..84eca6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
@@ -0,0 +1,142 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase {
+ private static String TEST_DB_2 = "TestDB2";
+
+ @BeforeClass
+ @Override
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL,
+ RebalanceMode.FULL_AUTO + "");
+ _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica);
+
+ Thread.sleep(200);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @Test()
+ public void testDisableEnableInstanceAutoRebalance() throws Exception {
+ String disabledInstance = _participants[0].getInstanceName();
+
+ Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2,
+ disabledInstance);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+ disabledInstance);
+ Assert.assertFalse(currentPartitions.isEmpty());
+
+ // disable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertTrue(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertTrue(currentPartitions.isEmpty());
+
+ //enable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertFalse(currentPartitions.isEmpty());
+ }
+
+ @Test()
+ public void testAddDisabledInstanceAutoRebalance() throws Exception {
+ // add disabled instance.
+ String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName);
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false);
+
+ participant.syncStart();
+
+ Thread.sleep(400);
+ Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertTrue(assignedPartitions.isEmpty());
+ Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+ nodeName);
+ Assert.assertTrue(currentPartitions.isEmpty());
+
+ //enable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertFalse(currentPartitions.isEmpty());
+ }
+
+ private Set<String> getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) {
+ HelixAdmin admin = _setupTool.getClusterManagementTool();
+ Set<String> partitionSet = new HashSet<String>();
+ IdealState is = admin.getResourceIdealState(cluster, dbName);
+ for (String partition : is.getRecord().getListFields().keySet()) {
+ List<String> assignments = is.getRecord().getListField(partition);
+ for (String ins : assignments) {
+ if (ins.equals(instance)) {
+ partitionSet.add(partition);
+ }
+ }
+ }
+
+ return partitionSet;
+ }
+
+ private Set<String> getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) {
+ HelixAdmin admin = _setupTool.getClusterManagementTool();
+ Set<String> partitionSet = new HashSet<String>();
+
+ ExternalView ev = admin.getResourceExternalView(cluster, dbName);
+ for (String partition : ev.getRecord().getMapFields().keySet()) {
+ Map<String, String> assignments = ev.getRecord().getMapField(partition);
+ for (String ins : assignments.keySet()) {
+ if (ins.equals(instance)) {
+ partitionSet.add(partition);
+ }
+ }
+ }
+ return partitionSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index 443d484..fb534fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -99,14 +99,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
_sleep = sleep;
}
- @Override
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
- LOG.info("Become SLAVE from OFFLINE");
-
- }
-
- @Override
@Transition(to = "MASTER", from = "SLAVE")
public void onBecomeMasterFromSlave(Message message, NotificationContext context)
throws InterruptedException {
@@ -117,26 +109,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
@Override
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
- LOG.info("Become SLAVE from MASTER");
- }
-
- @Override
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
- LOG.info("Become OFFLINE from SLAVE");
-
- }
-
- @Override
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- LOG.info("Become DROPPED from OFFLINE");
-
- }
-
- @Override
public void rollbackOnError(Message message, NotificationContext context,
StateTransitionError error) {
_error = error;
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 5d169d5..f694618 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -91,6 +91,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
CLUSTER_NAME));
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
index 61733ba..7d90063 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
@@ -43,67 +43,12 @@ public class MockMSStateModel extends StateModel {
_transition = transition;
}
- // overwrite default error->dropped transition
- @Transition(to = "DROPPED", from = "ERROR")
- public void onBecomeDroppedFromError(Message message, NotificationContext context)
+ @Transition(to = "*", from = "*")
+ public void generalTransitionHandle(Message message, NotificationContext context)
throws InterruptedException {
- LOG.info("Become DROPPED from ERROR");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become SLAVE from OFFLINE");
- if (_transition != null) {
- _transition.doTransition(message, context);
-
- }
- }
-
- @Transition(to = "MASTER", from = "SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become MASTER from SLAVE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become SLAVE from MASTER");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become OFFLINE from SLAVE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become DROPPED from OFFLINE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "ERROR")
- public void onBecomeOfflineFromError(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become OFFLINE from ERROR");
- // System.err.println("Become OFFLINE from ERROR");
+ LOG.info(String
+ .format("Resource %s partition %s becomes %s from %s", message.getResourceName(),
+ message.getPartitionName(), message.getToState(), message.getFromState()));
if (_transition != null) {
_transition.doTransition(message, context);
}