You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:20 UTC
[6/7] git commit: [HELIX-94] Add the ability to enable and disable a
resource, rb=20401
[HELIX-94] Add the ability to enable and disable a resource, rb=20401
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1af744a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1af744a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1af744a
Branch: refs/heads/master
Commit: c1af744af6088ad98fe2a8a5074f2c56199b6f82
Parents: 96aef71
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 14:56:47 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 11 10:04:37 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/resources/JsonParameters.java | 4 +
.../webapp/resources/ResourceGroupResource.java | 7 +
.../helix/webapp/TestDisableResource.java | 84 ++++++
.../main/java/org/apache/helix/HelixAdmin.java | 7 +
.../controller/rebalancer/CustomRebalancer.java | 5 +-
.../rebalancer/FallbackRebalancer.java | 4 +-
.../rebalancer/FullAutoRebalancer.java | 5 +-
.../rebalancer/SemiAutoRebalancer.java | 5 +-
.../config/SemiAutoRebalancerConfig.java | 2 +-
.../util/ConstraintBasedAssignment.java | 26 +-
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 84 ++++--
.../java/org/apache/helix/model/IdealState.java | 21 +-
.../participant/HelixCustomCodeRunner.java | 16 +-
.../org/apache/helix/tools/ClusterSetup.java | 17 +-
.../strategy/TestAutoRebalanceStrategy.java | 2 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 2 +-
.../TestDisableCustomCodeRunner.java | 252 +++++++++++++++++
.../helix/integration/TestDisableResource.java | 267 +++++++++++++++++++
.../helix/manager/zk/TestZkHelixAdmin.java | 34 ++-
.../apache/helix/tools/TestClusterSetup.java | 39 ++-
21 files changed, 826 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 5f405d8..19ac71a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
}
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ if (!_parameterMap.containsKey(ENABLED)) {
+ throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+ }
} else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 055f64a..6dc721d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -114,6 +114,13 @@ public class ResourceGroupResource extends ServerResource {
ClusterSetup setupTool = new ClusterSetup(zkClient);
setupTool.getClusterManagementTool()
.resetResource(clusterName, Arrays.asList(resourceName));
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ jsonParameters.verifyCommand(ClusterSetup.enableResource);
+ boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+ ZkClient zkClient =
+ (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+ ClusterSetup setupTool = new ClusterSetup(zkClient);
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else {
throw new HelixException("Unsupported command: " + command + ". Should be one of ["
+ ClusterSetup.resetResource + "]");
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ String instanceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + "TestDB0";
+
+ // Disable TestDB0
+ Map<String, String> paramMap = new HashMap<String, String>();
+ paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+
+ // Re-enable TestDB0
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 892bada..dce3893 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -177,6 +177,13 @@ public interface HelixAdmin {
void enableInstance(String clusterName, String instanceName, boolean enabled);
/**
+ * Disable or enable a resource
+ * @param clusterName
+ * @param resourceName
+ */
+ void enableResource(String clusterName, String resourceName, boolean enabled);
+
+ /**
* Disable or enable a list of partitions on an instance
* @param enabled
* @param clusterName
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 6629bec..4d5c373 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -14,6 +14,7 @@ import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -51,6 +52,8 @@ public class CustomRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
CustomRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -65,7 +68,7 @@ public class CustomRebalancer implements HelixRebalancer {
Map<ParticipantId, State> bestStateForPartition =
ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
.getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
- currentStateMap, disabledInstancesForPartition);
+ currentStateMap, disabledInstancesForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index 3aa41d7..e00e57c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -160,7 +160,7 @@ public class FallbackRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
.getLiveParticipantMap().keySet(), stateModelDef, newIdealState
.getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
- partitionId), disabledParticipants);
+ partitionId), disabledParticipants, idealState.isEnabled());
assignment.addReplicaMap(partitionId, replicaMap);
}
} else {
@@ -176,7 +176,7 @@ public class FallbackRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
.getLiveParticipantMap().keySet(), stateModelDef, newIdealState
.getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
- partitionId), disabledParticipants);
+ partitionId), disabledParticipants, idealState.isEnabled());
assignment.addReplicaMap(partitionId, replicaMap);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 13616b3..4bf030b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -67,6 +68,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
FullAutoRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
@@ -176,7 +179,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipants.keySet(), stateModelDef, preferenceList,
currentState.getCurrentStateMap(config.getResourceId(), partition),
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 51ca463..07f6337 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -15,6 +15,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@ public class SemiAutoRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
SemiAutoRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -77,7 +80,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
Map<ParticipantId, State> bestStateForPartition =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
.getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
- disabledInstancesForPartition);
+ disabledInstancesForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index 727c3df..60e30f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -116,7 +116,7 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
Map<ParticipantId, State> initialMap =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
- stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+ stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
currentMapping.put(partitionId, initialMap);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 7951784..addd652 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -117,23 +117,24 @@ public class ConstraintBasedAssignment {
* @param participants participants selected to serve the partition
* @param disabledParticipants participants that have been disabled for this partition
* @param initialState the initial state of the resource state model
+ * @param isEnabled true if resource is enabled, false otherwise
* @return map of participant id to state of dropped and disabled partitions
*/
public static Map<ParticipantId, State> dropAndDisablePartitions(
Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
- Set<ParticipantId> disabledParticipants, State initialState) {
+ Set<ParticipantId> disabledParticipants, boolean isEnabled, State initialState) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
// if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
if ((participants == null || !participants.contains(participantId))
- && !disabledParticipants.contains(participantId)) {
+ && !disabledParticipants.contains(participantId) && isEnabled) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
} else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipants.contains(participantId)) {
+ && (disabledParticipants.contains(participantId) || !isEnabled)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
participantStateMap.put(participantId, initialState);
}
@@ -151,16 +152,18 @@ public class ConstraintBasedAssignment {
* @param currentStateMap
* : participant->state for each partition
* @param disabledParticipantsForPartition
+ * @param isEnabled true if resource is enabled, false if disabled
* @return
*/
public static Map<ParticipantId, State> computeAutoBestStateForPartition(
Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
- Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> currentStateMap,
+ Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
// drop and disable participants if necessary
Map<ParticipantId, State> participantStateMap =
dropAndDisablePartitions(currentStateMap, participantPreferenceList,
- disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
+ disabledParticipantsForPartition, isEnabled, stateModelDef.getTypedInitialState());
// resource is deleted
if (participantPreferenceList == null) {
@@ -176,7 +179,7 @@ public class ConstraintBasedAssignment {
if ("N".equals(num)) {
Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
liveAndEnabled.removeAll(disabledParticipantsForPartition);
- stateCount = liveAndEnabled.size();
+ stateCount = isEnabled ? liveAndEnabled.size() : 0;
} else if ("R".equals(num)) {
stateCount = participantPreferenceList.size();
} else {
@@ -198,7 +201,7 @@ public class ConstraintBasedAssignment {
.equals(State.from(HelixDefinedState.ERROR));
if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
participantStateMap.put(participantId, state);
count = count + 1;
assigned[i] = true;
@@ -268,12 +271,13 @@ public class ConstraintBasedAssignment {
* @param preferenceMap
* @param currentStateMap
* @param disabledParticipantsForPartition
+ * @param isEnabled
* @return
*/
public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
- Set<ParticipantId> disabledParticipantsForPartition) {
+ Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
// if the resource is deleted, idealStateMap will be null/empty and
@@ -281,12 +285,12 @@ public class ConstraintBasedAssignment {
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
} else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
+ && (disabledParticipantsForPartition.contains(participantId) || !isEnabled)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
@@ -304,7 +308,7 @@ public class ConstraintBasedAssignment {
|| !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
if (liveParticipantSet.contains(participantId) && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
participantStateMap.put(participantId, preferenceMap.get(participantId));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2f93b7f..8c34f6b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -120,7 +120,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
.computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- disabledParticipantsForPartition));
+ disabledParticipantsForPartition, true));
}
return partitionMapping;
}
@@ -163,7 +163,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Set<ParticipantId> participants = participantStateMap.keySet();
Map<ParticipantId, State> droppedAndDisabledMap =
ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
- disabledParticipants, initialState);
+ disabledParticipants, true, initialState);
// don't map error participants
for (ParticipantId participantId : errorParticipants) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6dc5541..da0c80c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
@@ -130,7 +131,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
String instanceConfigsPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString());
@@ -157,9 +157,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
- // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
String instanceConfigPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
@@ -168,7 +165,7 @@ public class ZKHelixAdmin implements HelixAdmin {
+ clusterName);
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -182,7 +179,7 @@ public class ZKHelixAdmin implements HelixAdmin {
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+ ", instance config does not exist");
@@ -204,13 +201,36 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void enableResource(final String clusterName, final String resourceName,
+ final boolean enabled) {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+ + ", ideal-state does not exist");
+ }
+ baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+ + ", ideal-state is null");
+ }
+ IdealState idealState = new IdealState(currentData);
+ idealState.enable(enabled);
+ return idealState.getRecord();
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ @Override
public void enablePartition(final boolean enabled, final String clusterName,
final String instanceName, final String resourceName, final List<String> partitionNames) {
String path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
// check instanceConfig exists
if (!baseAccessor.exists(path, 0)) {
@@ -299,7 +319,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetPartition(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -404,7 +424,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetInstance(String clusterName, List<String> instanceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -430,7 +450,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetResource(String clusterName, List<String> resourceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -508,7 +528,6 @@ public class ZKHelixAdmin implements HelixAdmin {
// IDEAL STATE
_zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
// CONFIGURATIONS
- // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.CLUSTER.toString(), clusterName);
@@ -564,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
List<String> instances = _zkClient.getChildren(memberInstancesPath);
List<String> result = new ArrayList<String>();
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -662,7 +681,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public IdealState getResourceIdealState(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -671,7 +690,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -680,7 +699,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ExternalView getResourceExternalView(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -699,7 +718,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("State model path " + stateModelPath + " already exists.");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -707,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropResource(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -722,7 +741,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -805,7 +824,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropCluster(String clusterName) {
logger.info("Deleting cluster " + clusterName);
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -1067,15 +1086,23 @@ public class ZKHelixAdmin implements HelixAdmin {
int size = (int) file.length();
byte[] bytes = new byte[size];
- DataInputStream dis = new DataInputStream(new FileInputStream(file));
- int read = 0;
- int numRead = 0;
- while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
- read = read + numRead;
+ DataInputStream dis = null;
+ try {
+ dis = new DataInputStream(new FileInputStream(file));
+ int read = 0;
+ int numRead = 0;
+ while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+ read = read + numRead;
+ }
+ return bytes;
+ } finally {
+ if (dis != null) {
+ dis.close();
+ }
}
- return bytes;
}
+ @Override
public void addStateModelDef(String clusterName, String stateModelDefName,
String stateModelDefFile) throws IOException {
ZNRecord record =
@@ -1091,7 +1118,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId, final ConstraintItem constraintItem) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1112,7 +1139,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void removeConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1210,7 +1237,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("cluster " + clusterName + " instance " + instanceName
+ " is not setup yet");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -1238,6 +1265,7 @@ public class ZKHelixAdmin implements HelixAdmin {
accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
}
+ @Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ed17c5b..dc42a52 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -81,7 +81,8 @@ public class IdealState extends HelixProperty {
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
REBALANCER_CLASS_NAME,
- REBALANCER_CONFIG_NAME
+ REBALANCER_CONFIG_NAME,
+ HELIX_ENABLED
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -821,6 +822,7 @@ public class IdealState extends HelixProperty {
}
/**
+ * <<<<<<< HEAD
* Get the non-Helix simple fields from this property and add them to a UserConfig
* @param userConfig the user config to update
*/
@@ -950,4 +952,21 @@ public class IdealState extends HelixProperty {
Map<PartitionId, Map<ParticipantId, State>> participantStateMaps) {
return ResourceAssignment.stringMapsFromReplicaMaps(participantStateMaps);
}
+
+ /**
+ * Get if the resource is enabled or not
+ * By default, it's enabled
+ * @return true if enabled; false otherwise
+ */
+ public boolean isEnabled() {
+ return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+ }
+
+ /**
+ * Enable/Disable the resource
+ * @param enabled
+ */
+ public void enable(boolean enabled) {
+ _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index bddd8d1..2f169cc 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -51,7 +51,7 @@ import org.apache.log4j.Logger;
* .invoke(_callback)
* .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
* .usingLeaderStandbyModel("someUniqueId")
- * .run()
+ * .start()
* </code>
*/
public class HelixCustomCodeRunner {
@@ -106,6 +106,15 @@ public class HelixCustomCodeRunner {
}
/**
+ * Get resource name for the custom-code runner
+ * Used for retrieving the external view for the custom-code runner resource
+ * @return resource name for the custom-code runner
+ */
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
* This method will be invoked when there is a change in any subscribed
* notificationTypes
* @throws Exception
@@ -127,8 +136,9 @@ public class HelixCustomCodeRunner {
// manually add ideal state for participant leader using LeaderStandby
// model
- zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient =
+ new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
zkClient));
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index d97a853..4479f97 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
public static final String addInstanceTag = "addInstanceTag";
public static final String removeInstanceTag = "removeInstanceTag";
+ public static final String enableResource = "enableResource";
+
// Query info (TBD in V2)
public static final String listClusterInfo = "listClusterInfo";
public static final String listInstanceInfo = "listInstanceInfo";
@@ -745,6 +747,11 @@ public class ClusterSetup {
dropResourceOption.setRequired(false);
dropResourceOption.setArgName("clusterName resourceName");
+ Option enableResourceOption =
+ OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+ .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+ .create();
+
Option rebalanceOption =
OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
.create();
@@ -795,11 +802,11 @@ public class ClusterSetup {
partitionInfoOption.setArgName("clusterName resourceName partitionName");
Option enableInstanceOption =
- OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+ OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
.create();
enableInstanceOption.setArgs(3);
enableInstanceOption.setRequired(false);
- enableInstanceOption.setArgName("clusterName InstanceName true/false");
+ enableInstanceOption.setArgName("clusterName instanceName true/false");
Option enablePartitionOption =
OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -955,6 +962,7 @@ public class ClusterSetup {
group.addOption(dropInstanceOption);
group.addOption(swapInstanceOption);
group.addOption(dropResourceOption);
+ group.addOption(enableResourceOption);
group.addOption(instanceInfoOption);
group.addOption(clusterInfoOption);
group.addOption(resourceInfoOption);
@@ -1258,6 +1266,11 @@ public class ClusterSetup {
setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
return 0;
+ } else if (cmd.hasOption(enableResource)) {
+ String clusterName = cmd.getOptionValues(enableResource)[0];
+ String resourceName = cmd.getOptionValues(enableResource)[1];
+ boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else if (cmd.hasOption(enablePartition)) {
String[] args = cmd.getOptionValues(enablePartition);
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 7c74035..1322b40 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -239,7 +239,7 @@ public class TestAutoRebalanceStrategy {
Map<ParticipantId, State> assignment =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, true);
mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
}
return mapResult;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 2c26d5d..c8ec90a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -265,7 +265,7 @@ public class TestNewAutoRebalanceStrategy {
Map<ParticipantId, State> assignment =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, true);
mapResult.put(partitionId, assignment);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ class DummyCallback implements CustomCodeCallbackHandler {
+ private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+ new HashMap<NotificationContext.Type, Boolean>();
+
+ @Override
+ public void onCallback(NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ _callbackInvokeMap.put(type, Boolean.TRUE);
+ }
+
+ public void reset() {
+ _callbackInvokeMap.clear();
+ }
+
+ public boolean isInitTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+ }
+
+ public boolean isCallbackTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ Map<String, HelixCustomCodeRunner> customCodeRunners =
+ new HashMap<String, HelixCustomCodeRunner>();
+ Map<String, DummyCallback> callbacks =
+ new HashMap<String, DummyCallback>();
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants
+ .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+ customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+ ZK_ADDR));
+ callbacks.put(instanceName, new DummyCallback());
+
+ customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+ .on(ChangeType.LIVE_INSTANCE)
+ .usingLeaderStandbyModel("TestParticLeader").start();
+ participants.get(instanceName).syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Make sure callback is registered
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ final String customCodeRunnerResource =
+ customCodeRunners.get("localhost_12918").getResourceName();
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ String leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Disable custom-code runner resource
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+ // Verify that states of custom-code runner are all OFFLINE
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView =
+ accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+
+ // Change live-instance should not invoke any custom-code runner
+ LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+ fakeInstance.setSessionId("fakeSessionId");
+ fakeInstance.setHelixVersion("0.6");
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+
+ for (DummyCallback callback : callbacks.values()) {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+
+ // Remove fake instance
+ accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+ // Re-enable custom-code runner
+ admin.enableResource(clusterName, customCodeRunnerResource, true);
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Verify that custom-invoke is invoked again
+ extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Add a fake instance should invoke custom-code runner
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isCallbackTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+ }
+
+ // Clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..ea8974d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,267 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ @Test
+ public void testDisableResourceInSemiAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInFullAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInCustomMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // set up custom ideal-state
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+ idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private void enableResource(String clusterName, boolean enabled) {
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, "TestDB0", enabled);
+ }
+
+ /**
+ * Check all partitions are in OFFLINE state
+ * @param accessor
+ * @throws Exception
+ */
+ private void checkExternalView(String clusterName) throws Exception {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+ // verify that states of TestDB0 are all OFFLINE
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 89a947f..d3925be 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
@@ -41,6 +43,7 @@ import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -62,7 +65,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
_gZkClient.deleteRecursive(rootPath);
}
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
tool.addCluster(clusterName, true);
@@ -205,7 +208,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -241,7 +244,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -294,4 +297,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
+
+ @Test
+ public void testDisableResource() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+ String resourceName = "TestDB";
+ admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForMasterSlave()));
+ admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+ admin.enableResource(clusterName, resourceName, false);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertFalse(idealState.isEnabled());
+ admin.enableResource(clusterName, resourceName, true);
+ idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index d3d6736..a528a20 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,7 +23,10 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
@@ -36,8 +39,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
@@ -46,8 +49,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestClusterSetup extends ZkUnitTestBase {
- private static Logger LOG = Logger.getLogger(TestClusterSetup.class);
-
protected static final String CLUSTER_NAME = "TestClusterSetup";
protected static final String TEST_DB = "TestDB";
protected static final String INSTANCE_PREFIX = "instance_";
@@ -437,4 +438,36 @@ public class TestClusterSetup extends ZkUnitTestBase {
}
+ @Test
+ public void testDisableResource() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ // disable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+ });
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+ // enable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+ });
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
}