You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/21 23:57:42 UTC
git commit: [HELIX-94] Add the ability to enable and disable a
resource, rb=20401
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 3146762b2 -> c5a754be0
[HELIX-94] Add the ability to enable and disable a resource, rb=20401
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c5a754be
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c5a754be
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c5a754be
Branch: refs/heads/helix-0.6.x
Commit: c5a754be0e306e4413d9a0528d012b5c4dba8917
Parents: 3146762
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 14:56:47 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed May 21 14:56:47 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/resources/JsonParameters.java | 4 +
.../webapp/resources/ResourceGroupResource.java | 7 +
.../helix/webapp/TestDisableResource.java | 84 ++++++
.../main/java/org/apache/helix/HelixAdmin.java | 7 +
.../controller/rebalancer/AutoRebalancer.java | 3 +-
.../controller/rebalancer/CustomRebalancer.java | 17 +-
.../rebalancer/SemiAutoRebalancer.java | 3 +-
.../util/ConstraintBasedAssignment.java | 20 +-
.../stages/ExternalViewComputeStage.java | 1 -
.../apache/helix/manager/zk/ZKHelixAdmin.java | 84 ++++--
.../java/org/apache/helix/model/IdealState.java | 20 +-
.../participant/HelixCustomCodeRunner.java | 20 +-
.../org/apache/helix/tools/ClusterSetup.java | 17 +-
.../strategy/TestAutoRebalanceStrategy.java | 2 +-
.../TestDisableCustomCodeRunner.java | 252 +++++++++++++++++
.../helix/integration/TestDisableResource.java | 268 +++++++++++++++++++
.../helix/manager/zk/TestZkHelixAdmin.java | 32 ++-
.../mock/participant/MockMSModelFactory.java | 2 -
.../apache/helix/tools/TestClusterSetup.java | 36 ++-
19 files changed, 819 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 5f405d8..19ac71a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
}
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ if (!_parameterMap.containsKey(ENABLED)) {
+ throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+ }
} else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 055f64a..6dc721d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -114,6 +114,13 @@ public class ResourceGroupResource extends ServerResource {
ClusterSetup setupTool = new ClusterSetup(zkClient);
setupTool.getClusterManagementTool()
.resetResource(clusterName, Arrays.asList(resourceName));
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ jsonParameters.verifyCommand(ClusterSetup.enableResource);
+ boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+ ZkClient zkClient =
+ (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+ ClusterSetup setupTool = new ClusterSetup(zkClient);
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else {
throw new HelixException("Unsupported command: " + command + ". Should be one of ["
+ ClusterSetup.resetResource + "]");
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ String instanceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + "TestDB0";
+
+ // Disable TestDB0
+ Map<String, String> paramMap = new HashMap<String, String>();
+ paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+
+ // Re-enable TestDB0
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 496c73c..7bb4c3a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -170,6 +170,13 @@ public interface HelixAdmin {
void enableInstance(String clusterName, String instanceName, boolean enabled);
/**
+ * Disable or enable a resource
+ * @param clusterName
+ * @param resourceName
+ */
+ void enableResource(String clusterName, String resourceName, boolean enabled);
+
+ /**
* Disable or enable a list of partitions on an instance
* @param enabled
* @param clusterName
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 7041a25..d9b70d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -236,7 +236,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
Map<String, String> bestStateForPartition =
ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
+ preferenceList, currentStateMap, disabledInstancesForPartition,
+ idealState.isEnabled());
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index fd288c2..0f3fbc4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -77,7 +77,7 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
idealState.getInstanceStateMap(partition.getPartitionName());
Map<String, String> bestStateForPartition =
computeCustomizedBestStateForPartition(cache, stateModelDef, idealStateMap,
- currentStateMap, disabledInstancesForPartition);
+ currentStateMap, disabledInstancesForPartition, idealState.isEnabled());
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
@@ -90,11 +90,13 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
* @param idealStateMap
* @param currentStateMap
* @param disabledInstancesForPartition
+ * @param isResourceEnabled
* @return
*/
private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+ Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
+ boolean isResourceEnabled) {
Map<String, String> instanceStateMap = new HashMap<String, String>();
// if the ideal state is deleted, idealStateMap will be null/empty and
@@ -102,12 +104,12 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
if (currentStateMap != null) {
for (String instance : currentStateMap.keySet()) {
if ((idealStateMap == null || !idealStateMap.containsKey(instance))
- && !disabledInstancesForPartition.contains(instance)) {
+ && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
// if dropped and not disabled, transit to DROPPED
instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
} else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
+ HelixDefinedState.ERROR.name()))
+ && (!isResourceEnabled || disabledInstancesForPartition.contains(instance))) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
instanceStateMap.put(instance, stateModelDef.getInitialState());
}
@@ -125,8 +127,9 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator {
currentStateMap == null || currentStateMap.get(instance) == null
|| !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
- if (liveInstancesMap.containsKey(instance) && notInErrorState
- && !disabledInstancesForPartition.contains(instance)) {
+ boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled;
+
+ if (liveInstancesMap.containsKey(instance) && notInErrorState && enabled) {
instanceStateMap.put(instance, idealStateMap.get(instance));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 405c317..86c0438 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -76,7 +76,8 @@ public class SemiAutoRebalancer implements Rebalancer, MappingCalculator {
ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
Map<String, String> bestStateForPartition =
ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
+ preferenceList, currentStateMap, disabledInstancesForPartition,
+ idealState.isEnabled());
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 3fd52f4..bab357b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -43,9 +43,9 @@ import org.apache.log4j.Logger;
public class ConstraintBasedAssignment {
private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
- public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
+ public static List<String> getPreferenceList(ClusterDataCache cache, Partition partition,
IdealState idealState, StateModelDefinition stateModelDef) {
- List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+ List<String> listField = idealState.getPreferenceList(partition.getPartitionName());
if (listField != null && listField.size() == 1
&& StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
@@ -66,11 +66,13 @@ public class ConstraintBasedAssignment {
* @param currentStateMap
* : instance->state for each partition
* @param disabledInstancesForPartition
+ * @param isResourceEnabled
* @return
*/
public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
StateModelDefinition stateModelDef, List<String> instancePreferenceList,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+ Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition,
+ boolean isResourceEnabled) {
Map<String, String> instanceStateMap = new HashMap<String, String>();
// if the ideal state is deleted, instancePreferenceList will be empty and
@@ -78,12 +80,12 @@ public class ConstraintBasedAssignment {
if (currentStateMap != null) {
for (String instance : currentStateMap.keySet()) {
if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
+ && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) {
// if dropped and not disabled, transit to DROPPED
instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
} else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
+ HelixDefinedState.ERROR.name()))
+ && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
instanceStateMap.put(instance, stateModelDef.getInitialState());
}
@@ -106,7 +108,7 @@ public class ConstraintBasedAssignment {
if ("N".equals(num)) {
Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
liveAndEnabled.removeAll(disabledInstancesForPartition);
- stateCount = liveAndEnabled.size();
+ stateCount = isResourceEnabled ? liveAndEnabled.size() : 0;
} else if ("R".equals(num)) {
stateCount = instancePreferenceList.size();
} else {
@@ -125,8 +127,10 @@ public class ConstraintBasedAssignment {
currentStateMap == null || currentStateMap.get(instanceName) == null
|| !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+ boolean enabled =
+ !disabledInstancesForPartition.contains(instanceName) && isResourceEnabled;
if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
- && !disabledInstancesForPartition.contains(instanceName)) {
+ && enabled) {
instanceStateMap.put(instanceName, state);
count = count + 1;
assigned[i] = true;
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 35ef177..6f69702 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 70b0387..aa2d617 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
@@ -118,7 +119,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
String instanceConfigsPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString());
@@ -145,9 +145,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
- // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
String instanceConfigPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
@@ -156,7 +153,7 @@ public class ZKHelixAdmin implements HelixAdmin {
+ clusterName);
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -170,7 +167,7 @@ public class ZKHelixAdmin implements HelixAdmin {
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+ ", instance config does not exist");
@@ -192,13 +189,36 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void enableResource(final String clusterName, final String resourceName,
+ final boolean enabled) {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+ + ", ideal-state does not exist");
+ }
+ baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+ + ", ideal-state is null");
+ }
+ IdealState idealState = new IdealState(currentData);
+ idealState.enable(enabled);
+ return idealState.getRecord();
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ @Override
public void enablePartition(final boolean enabled, final String clusterName,
final String instanceName, final String resourceName, final List<String> partitionNames) {
String path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
// check instanceConfig exists
if (!baseAccessor.exists(path, 0)) {
@@ -289,7 +309,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetPartition(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -392,7 +412,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetInstance(String clusterName, List<String> instanceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -418,7 +438,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetResource(String clusterName, List<String> resourceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -496,7 +516,6 @@ public class ZKHelixAdmin implements HelixAdmin {
// IDEAL STATE
_zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
// CONFIGURATIONS
- // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.CLUSTER.toString(), clusterName);
@@ -552,7 +571,7 @@ public class ZKHelixAdmin implements HelixAdmin {
List<String> instances = _zkClient.getChildren(memberInstancesPath);
List<String> result = new ArrayList<String>();
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -649,7 +668,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public IdealState getResourceIdealState(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -658,7 +677,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -667,7 +686,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ExternalView getResourceExternalView(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -686,7 +705,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("State model path " + stateModelPath + " already exists.");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -694,7 +713,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropResource(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -709,7 +728,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -719,7 +738,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropCluster(String clusterName) {
logger.info("Deleting cluster " + clusterName);
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -916,15 +935,23 @@ public class ZKHelixAdmin implements HelixAdmin {
int size = (int) file.length();
byte[] bytes = new byte[size];
- DataInputStream dis = new DataInputStream(new FileInputStream(file));
- int read = 0;
- int numRead = 0;
- while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
- read = read + numRead;
+ DataInputStream dis = null;
+ try {
+ dis = new DataInputStream(new FileInputStream(file));
+ int read = 0;
+ int numRead = 0;
+ while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+ read = read + numRead;
+ }
+ return bytes;
+ } finally {
+ if (dis != null) {
+ dis.close();
+ }
}
- return bytes;
}
+ @Override
public void addStateModelDef(String clusterName, String stateModelDefName,
String stateModelDefFile) throws IOException {
ZNRecord record =
@@ -940,7 +967,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId, final ConstraintItem constraintItem) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -961,7 +988,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void removeConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1059,7 +1086,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("cluster " + clusterName + " instance " + instanceName
+ " is not setup yet");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -1087,6 +1114,7 @@ public class ZKHelixAdmin implements HelixAdmin {
accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
}
+ @Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 2ef1569..a209cd9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -51,7 +51,8 @@ public class IdealState extends HelixProperty {
REBALANCE_TIMER_PERIOD,
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
- REBALANCER_CLASS_NAME
+ REBALANCER_CLASS_NAME,
+ HELIX_ENABLED
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -505,4 +506,21 @@ public class IdealState extends HelixProperty {
}
return rebalanceMode;
}
+
+ /**
+ * Get if the resource is enabled or not
+ * By default, it's enabled
+ * @return true if enabled; false otherwise
+ */
+ public boolean isEnabled() {
+ return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+ }
+
+ /**
+ * Enable/Disable the resource
+ * @param enabled
+ */
+ public void enable(boolean enabled) {
+ _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index 6a2490a..819a302 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -28,6 +28,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -48,7 +49,7 @@ import org.apache.log4j.Logger;
* .invoke(_callback)
* .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
* .usingLeaderStandbyModel("someUniqueId")
- * .run()
+ * .start()
* </code>
*/
public class HelixCustomCodeRunner {
@@ -103,6 +104,15 @@ public class HelixCustomCodeRunner {
}
/**
+ * Get resource name for the custom-code runner
+ * Used for retrieving the external view for the custom-code runner resource
+ * @return resource name for the custom-code runner
+ */
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
* This method will be invoked when there is a change in any subscribed
* notificationTypes
* @throws Exception
@@ -124,10 +134,12 @@ public class HelixCustomCodeRunner {
// manually add ideal state for participant leader using LeaderStandby
// model
- zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient =
+ new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor(zkClient));
+ new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
+ zkClient));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = new IdealState(_resourceName);
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 10c8633..40bc398 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
public static final String addInstanceTag = "addInstanceTag";
public static final String removeInstanceTag = "removeInstanceTag";
+ public static final String enableResource = "enableResource";
+
// Query info (TBD in V2)
public static final String listClusterInfo = "listClusterInfo";
public static final String listInstanceInfo = "listInstanceInfo";
@@ -737,6 +739,11 @@ public class ClusterSetup {
dropResourceOption.setRequired(false);
dropResourceOption.setArgName("clusterName resourceName");
+ Option enableResourceOption =
+ OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+ .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+ .create();
+
Option rebalanceOption =
OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
.create();
@@ -787,11 +794,11 @@ public class ClusterSetup {
partitionInfoOption.setArgName("clusterName resourceName partitionName");
Option enableInstanceOption =
- OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+ OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
.create();
enableInstanceOption.setArgs(3);
enableInstanceOption.setRequired(false);
- enableInstanceOption.setArgName("clusterName InstanceName true/false");
+ enableInstanceOption.setArgName("clusterName instanceName true/false");
Option enablePartitionOption =
OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -925,6 +932,7 @@ public class ClusterSetup {
group.addOption(dropInstanceOption);
group.addOption(swapInstanceOption);
group.addOption(dropResourceOption);
+ group.addOption(enableResourceOption);
group.addOption(instanceInfoOption);
group.addOption(clusterInfoOption);
group.addOption(resourceInfoOption);
@@ -1224,6 +1232,11 @@ public class ClusterSetup {
setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
return 0;
+ } else if (cmd.hasOption(enableResource)) {
+ String clusterName = cmd.getOptionValues(enableResource)[0];
+ String resourceName = cmd.getOptionValues(enableResource)[1];
+ boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else if (cmd.hasOption(enablePartition)) {
String[] args = cmd.getOptionValues(enablePartition);
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index c93c51e..985d0c8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -230,7 +230,7 @@ public class TestAutoRebalanceStrategy {
Set<String> disabled = Collections.emptySet();
Map<String, String> assignment =
ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
- preferenceList, currentStateMap, disabled);
+ preferenceList, currentStateMap, disabled, true);
mapResult.put(partition, assignment);
}
return mapResult;
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ class DummyCallback implements CustomCodeCallbackHandler {
+ private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+ new HashMap<NotificationContext.Type, Boolean>();
+
+ @Override
+ public void onCallback(NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ _callbackInvokeMap.put(type, Boolean.TRUE);
+ }
+
+ public void reset() {
+ _callbackInvokeMap.clear();
+ }
+
+ public boolean isInitTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+ }
+
+ public boolean isCallbackTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ Map<String, HelixCustomCodeRunner> customCodeRunners =
+ new HashMap<String, HelixCustomCodeRunner>();
+ Map<String, DummyCallback> callbacks =
+ new HashMap<String, DummyCallback>();
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants
+ .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+ customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+ ZK_ADDR));
+ callbacks.put(instanceName, new DummyCallback());
+
+ customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+ .on(ChangeType.LIVE_INSTANCE)
+ .usingLeaderStandbyModel("TestParticLeader").start();
+ participants.get(instanceName).syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Make sure callback is registered
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ final String customCodeRunnerResource =
+ customCodeRunners.get("localhost_12918").getResourceName();
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ String leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Disable custom-code runner resource
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+ // Verify that states of custom-code runner are all OFFLINE
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView =
+ accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+
+ // Change live-instance should not invoke any custom-code runner
+ LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+ fakeInstance.setSessionId("fakeSessionId");
+ fakeInstance.setHelixVersion("0.6");
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+
+ for (DummyCallback callback : callbacks.values()) {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+
+ // Remove fake instance
+ accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+ // Re-enable custom-code runner
+ admin.enableResource(clusterName, customCodeRunnerResource, true);
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Verify that custom-invoke is invoked again
+ extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Add a fake instance should invoke custom-code runner
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isCallbackTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+ }
+
+ // Clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..8a0fe5a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,268 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ @Test
+ public void testDisableResourceInSemiAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInFullAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInCustomMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // set up custom ideal-state
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+ idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private void enableResource(String clusterName, boolean enabled) {
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, "TestDB0", enabled);
+ }
+
+ /**
+ * Check all partitions are in OFFLINE state
+ * @param accessor
+ * @throws Exception
+ */
+ private void checkExternalView(String clusterName) throws Exception {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+ // verify that states of TestDB0 are all OFFLINE
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 4b3764f..cfdf1a3 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -34,6 +34,7 @@ import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConfigScopeBuilder;
@@ -56,7 +57,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
_gZkClient.deleteRecursive(rootPath);
}
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
tool.addCluster(clusterName, true);
@@ -199,7 +200,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -235,7 +236,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -288,4 +289,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
+
+ @Test
+ public void testDisableResource() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+ String resourceName = "TestDB";
+ admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForMasterSlave()));
+ admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+ admin.enableResource(clusterName, resourceName, false);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertFalse(idealState.isEnabled());
+ admin.enableResource(clusterName, resourceName, true);
+ idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index ff7a455..9325934 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -19,8 +19,6 @@ package org.apache.helix.mock.participant;
* under the License.
*/
-import java.util.Map;
-
import org.apache.helix.participant.statemachine.StateModelFactory;
// mock master slave state model factory
http://git-wip-us.apache.org/repos/asf/helix/blob/c5a754be/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index 7190968..1802c1a 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
@@ -36,10 +38,10 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.AssertJUnit;
@@ -440,4 +442,36 @@ public class TestClusterSetup extends ZkUnitTestBase {
}
+ @Test
+ public void testDisableResource() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ // disable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+ });
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+ // enable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+ });
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
}