You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/07/16 23:31:23 UTC
helix git commit: [HELIX-599] Support creating/maintaining/routing
resources with same names in different instance groups.
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 2775e1566 -> a23beb7cf
[HELIX-599] Support creating/maintaining/routing resources with same names in different instance groups.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a23beb7c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a23beb7c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a23beb7c
Branch: refs/heads/helix-0.6.x
Commit: a23beb7cf79a3f1da104a55477c7eddb594fa68b
Parents: 2775e15
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon May 11 10:54:27 2015 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Jul 9 22:37:54 2015 -0700
----------------------------------------------------------------------
.../stages/MessageGenerationPhase.java | 23 +-
.../stages/ResourceComputationStage.java | 8 +
.../apache/helix/manager/zk/ZKHelixAdmin.java | 7 +
.../org/apache/helix/model/ExternalView.java | 36 ++
.../java/org/apache/helix/model/IdealState.java | 45 +-
.../java/org/apache/helix/model/Message.java | 38 ++
.../java/org/apache/helix/model/Resource.java | 35 ++
.../helix/spectator/RoutingTableProvider.java | 304 ++++++++++--
.../org/apache/helix/tools/ClusterSetup.java | 82 +++-
.../integration/TestResourceGroupEndtoEnd.java | 465 +++++++++++++++++++
10 files changed, 989 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index bc3c739..2e919f8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -70,7 +70,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
for (String resourceName : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceName);
- int bucketSize = resource.getBucketSize();
StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
@@ -125,9 +124,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
} else {
Message message =
- createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
- currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
- resource.getStateModelFactoryname(), bucketSize);
+ createMessage(manager, resource, partition.getPartitionName(), instanceName,
+ currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId());
IdealState idealState = cache.getIdealState(resourceName);
if (idealState != null
@@ -188,23 +186,30 @@ public class MessageGenerationPhase extends AbstractBaseStage {
event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
}
- private Message createMessage(HelixManager manager, String resourceName, String partitionName,
+ private Message createMessage(HelixManager manager, Resource resource, String partitionName,
String instanceName, String currentState, String nextState, String sessionId,
- String stateModelDefName, String stateModelFactoryName, int bucketSize) {
+ String stateModelDefName) {
String uuid = UUID.randomUUID().toString();
Message message = new Message(MessageType.STATE_TRANSITION, uuid);
message.setSrcName(manager.getInstanceName());
message.setTgtName(instanceName);
message.setMsgState(MessageState.NEW);
message.setPartitionName(partitionName);
- message.setResourceName(resourceName);
+ message.setResourceName(resource.getResourceName());
message.setFromState(currentState);
message.setToState(nextState);
message.setTgtSessionId(sessionId);
message.setSrcSessionId(manager.getSessionId());
message.setStateModelDef(stateModelDefName);
- message.setStateModelFactoryName(stateModelFactoryName);
- message.setBucketSize(bucketSize);
+ message.setStateModelFactoryName(resource.getStateModelFactoryname());
+ message.setBucketSize(resource.getBucketSize());
+
+ if (resource.getResourceGroupName() != null) {
+ message.setResourceGroupName(resource.getResourceGroupName());
+ }
+ if (resource.getResourceTag() != null) {
+ message.setResourceTag(resource.getResourceTag());
+ }
return message;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5676098..bde2904 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -62,6 +62,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
resource.setBucketSize(idealState.getBucketSize());
resource.setBatchMessageMode(idealState.getBatchMessageMode());
+ resource.setResourceGroupName(idealState.getResourceGroupName());
+ resource.setResourceTag(idealState.getInstanceGroupTag());
}
for (String partition : partitionSet) {
@@ -102,6 +104,12 @@ public class ResourceComputationStage extends AbstractBaseStage {
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
resource.setBucketSize(currentState.getBucketSize());
resource.setBatchMessageMode(currentState.getBatchMessageMode());
+
+ IdealState idealState = idealStates.get(resourceName);
+ if (idealState != null) {
+ resource.setResourceGroupName(idealState.getResourceGroupName());
+ resource.setResourceTag(idealState.getInstanceGroupTag());
+ }
}
if (currentState.getStateModelDefRef() == null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ecf84f8..e97ac9b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -404,6 +404,13 @@ public class ZKHelixAdmin implements HelixAdmin {
message.setToState(stateModel.getInitialState());
message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+ if (idealState.getResourceGroupName() != null) {
+ message.setResourceGroupName(idealState.getResourceGroupName());
+ }
+ if (idealState.getInstanceGroupTag() != null) {
+ message.setResourceTag(idealState.getInstanceGroupTag());
+ }
+
resetMessages.add(message);
messageKeys.add(keyBuilder.message(instanceName, message.getId()));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index d5f1afc..7b201b0 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -31,6 +31,16 @@ import org.apache.helix.ZNRecord;
* of current states for the partitions in a resource
*/
public class ExternalView extends HelixProperty {
+
+ /**
+ * Properties that are persisted and are queryable for an external view
+ */
+ public enum ExternalViewProperty {
+ INSTANCE_GROUP_TAG,
+ RESOURCE_GROUP_NAME,
+ GROUP_ROUTING_ENABLED
+ }
+
/**
* Instantiate an external view with the resource it corresponds to
* @param resource the name of the resource
@@ -95,6 +105,32 @@ public class ExternalView extends HelixProperty {
return _record.getId();
}
+ /**
+ * Get the resource group name
+ *
+ * @return the name of the resource group this resource belongs to.
+ */
+ public String getResourceGroupName() {
+ return _record.getSimpleField(ExternalViewProperty.RESOURCE_GROUP_NAME.toString());
+ }
+
+ /**
+ * Check whether the group routing is enabled for this resource.
+ *
+ * @return true if the group routing enabled for this resource; false otherwise
+ */
+ public boolean isGroupRoutingEnabled() {
+ return _record.getBooleanField(ExternalViewProperty.GROUP_ROUTING_ENABLED.name(), false);
+ }
+
+ /**
+ * Check for a group tag of this resource
+ * @return the group tag, or null if none is present
+ */
+ public String getInstanceGroupTag() {
+ return _record.getSimpleField(ExternalViewProperty.INSTANCE_GROUP_TAG.toString());
+ }
+
@Override
public boolean isValid() {
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index bc31e1e..d2744ac 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -55,7 +55,9 @@ public class IdealState extends HelixProperty {
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
REBALANCER_CLASS_NAME,
- HELIX_ENABLED
+ HELIX_ENABLED,
+ RESOURCE_GROUP_NAME,
+ GROUP_ROUTING_ENABLED
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -113,7 +115,7 @@ public class IdealState extends HelixProperty {
}
/**
- * Get the rebalance mode of the ideal state
+ * Set the rebalance mode of the ideal state
* @param mode {@link IdealStateModeProperty}
*/
@Deprecated
@@ -124,7 +126,7 @@ public class IdealState extends HelixProperty {
}
/**
- * Get the rebalance mode of the resource
+ * Set the rebalance mode of the resource
* @param rebalancerType
*/
public void setRebalanceMode(RebalanceMode rebalancerType) {
@@ -160,6 +162,43 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the resource group name
+ * @param resourceGroupName
+ */
+ public void setResourceGroupName(String resourceGroupName) {
+ _record.setSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString(), resourceGroupName);
+ }
+
+ /**
+ * Get the resource group name
+ *
+ * @return
+ */
+ public String getResourceGroupName() {
+ return _record.getSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString());
+ }
+
+ /**
+ * Get if the resource group routing feature is enabled or not
+ * By default, it's disabled
+ *
+ * @return true if enabled; false otherwise
+ */
+ public boolean isResourceGroupEnabled() {
+ return _record.getBooleanField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(), false);
+ }
+
+ /**
+ * Enable/Disable the aggregated routing on resource group.
+ *
+ * @param enabled
+ */
+ public void enableGroupRouting(boolean enabled) {
+ _record.setSimpleField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(),
+ Boolean.toString(enabled));
+ }
+
+ /**
* Set the maximum number of partitions of this resource that an instance can serve
* @param max the maximum number of partitions supported
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 937a28e..9fed87b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -64,6 +64,8 @@ public class Message extends HelixProperty {
MSG_STATE,
PARTITION_NAME,
RESOURCE_NAME,
+ RESOURCE_GROUP_NAME,
+ RESOURCE_TAG,
FROM_STATE,
TO_STATE,
STATE_MODEL_DEF,
@@ -397,6 +399,42 @@ public class Message extends HelixProperty {
}
/**
+ * Set the resource group associated with this message
+ *
+ * @param resourceGroupName resource group name to set
+ */
+ public void setResourceGroupName(String resourceGroupName) {
+ _record.setSimpleField(Attributes.RESOURCE_GROUP_NAME.toString(), resourceGroupName);
+ }
+
+ /**
+ * Get the resource group name associated with this message
+ *
+ * @return resource group name
+ */
+ public String getResourceGroupName() {
+ return _record.getSimpleField(Attributes.RESOURCE_GROUP_NAME.toString());
+ }
+
+ /**
+ * Set the resource tag associated with this message
+ *
+ * @param resourceTag resource tag to set
+ */
+ public void setResourceTag(String resourceTag) {
+ _record.setSimpleField(Attributes.RESOURCE_TAG.toString(), resourceTag);
+ }
+
+ /**
+ * Get the resource tag associated with this message
+ *
+ * @return resource tag
+ */
+ public String getResourceTag() {
+ return _record.getSimpleField(Attributes.RESOURCE_TAG.toString());
+ }
+
+ /**
* Get the resource partition associated with this message
* @return partition name
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java
index 1544514..7a22686 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -38,6 +38,8 @@ public class Resource {
private String _stateModelFactoryName;
private int _bucketSize = 0;
private boolean _batchMessageMode = false;
+ private String _resourceGroupName;
+ private String _resourceTag;
/**
* Instantiate a resource by its name
@@ -149,6 +151,39 @@ public class Resource {
return _batchMessageMode;
}
+ /**
+ * Get the resource tag assigned to this resource
+ *
+ * @return the name of the tag
+ */
+ public String getResourceTag() {
+ return _resourceTag;
+ }
+
+ /**
+ * Set the resource tag
+ * @param resourceTag
+ */
+ public void setResourceTag(String resourceTag) {
+ _resourceTag = resourceTag;
+ }
+
+ /**
+ * Get resource group name
+ * @return the resource group name
+ */
+ public String getResourceGroupName() {
+ return _resourceGroupName;
+ }
+
+ /**
+ * Set resource group name
+ * @param resourceGroupName
+ */
+ public void setResourceGroupName(String resourceGroupName) {
+ _resourceGroupName = resourceGroupName;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 9bba660..bd2b44d 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,6 +51,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
/**
* returns the instances for {resource,partition} pair that are in a specific
* {state}
+ *
+ * This method will be deprecated, please use the
+ * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
* @param resourceName
* -
* @param partitionName
@@ -57,6 +61,19 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state) {
+ return getInstancesForResource(resourceName, partitionName, state);
+ }
+
+ /**
+ * returns the instances for {resource,partition} pair that are in a specific
+ * {state}
+ * @param resourceName
+ * -
+ * @param partitionName
+ * @param state
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, String state) {
List<InstanceConfig> instanceList = null;
RoutingTable _routingTable = _routingTableRef.get();
ResourceInfo resourceInfo = _routingTable.get(resourceName);
@@ -73,15 +90,93 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
}
/**
+ * returns the instances for {resource group,partition} pair in all resources belongs to the given
+ * resource group that are in a specific {state}.
+ *
+ * The return results aggregate all partition states from all the resources in the given resource
+ * group.
+ *
+ * @param resourceGroupName
+ * @param partitionName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+ String partitionName, String state) {
+ List<InstanceConfig> instanceList = null;
+ RoutingTable _routingTable = _routingTableRef.get();
+ ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+ if (resourceGroupInfo != null) {
+ PartitionInfo keyInfo = resourceGroupInfo.get(partitionName);
+ if (keyInfo != null) {
+ instanceList = keyInfo.get(state);
+ }
+ }
+ if (instanceList == null) {
+ instanceList = Collections.emptyList();
+ }
+ return instanceList;
+ }
+
+ /**
+ * returns the instances for {resource group,partition} pair contains any of the given tags
+ * that are in a specific {state}.
+ *
+ * Find all resources belongs to the given resource group that have any of the given resource tags
+ * and return the aggregated partition states from all these resources.
+ *
+ * @param resourceGroupName
+ * @param partitionName
+ * @param state
+ * @param resourceTags
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName,
+ String state, List<String> resourceTags) {
+ RoutingTable _routingTable = _routingTableRef.get();
+ ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+ List<InstanceConfig> instanceList = null;
+ if (resourceGroupInfo != null) {
+ instanceList = new ArrayList<InstanceConfig>();
+ for (String tag : resourceTags) {
+ PartitionInfo keyInfo = resourceGroupInfo.get(partitionName, tag);
+ if (keyInfo != null && keyInfo.containsState(state)) {
+ instanceList.addAll(keyInfo.get(state));
+ }
+ }
+ }
+ if (instanceList == null) {
+ return Collections.emptyList();
+ }
+
+ return instanceList;
+ }
+
+ /**
* returns all instances for {resource} that are in a specific {state}
- * @param resource
+ *
+ * This method will be deprecated, please use the
+ * {@link #getInstancesForResource(String, String) getInstancesForResource} method.
+ * @param resourceName
* @param state
* @return empty list if there is no instance in a given state
*/
- public Set<InstanceConfig> getInstances(String resource, String state) {
+ public Set<InstanceConfig> getInstances(String resourceName, String state) {
+ return getInstancesForResource(resourceName, state);
+ }
+
+ /**
+ * returns all instances for {resource} that are in a specific {state}.
+ * @param resourceName
+ * @param state
+ * @return empty list if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
Set<InstanceConfig> instanceSet = null;
RoutingTable routingTable = _routingTableRef.get();
- ResourceInfo resourceInfo = routingTable.get(resource);
+ ResourceInfo resourceInfo = routingTable.get(resourceName);
if (resourceInfo != null) {
instanceSet = resourceInfo.getInstances(state);
}
@@ -91,6 +186,56 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
return instanceSet;
}
+ /**
+ * returns all instances for all resources in {resource group} that are in a specific {state}
+ *
+ * @param resourceGroupName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
+ Set<InstanceConfig> instanceSet = null;
+ RoutingTable _routingTable = _routingTableRef.get();
+ ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+ if (resourceGroupInfo != null) {
+ instanceSet = resourceGroupInfo.getInstances(state);
+ }
+ if (instanceSet == null) {
+ instanceSet = Collections.emptySet();
+ }
+ return instanceSet;
+ }
+
+ /**
+ * returns all instances for resources contains any given tags in {resource group} that are in a
+ * specific {state}
+ *
+ * @param resourceGroupName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
+ List<String> resourceTags) {
+ Set<InstanceConfig> instanceSet = null;
+ RoutingTable _routingTable = _routingTableRef.get();
+ ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+ if (resourceGroupInfo != null) {
+ instanceSet = new HashSet<InstanceConfig>();
+ for (String tag : resourceTags) {
+ Set<InstanceConfig> instances = resourceGroupInfo.getInstances(state, tag);
+ if (instances != null) {
+ instanceSet.addAll(resourceGroupInfo.getInstances(state, tag));
+ }
+ }
+ }
+ if (instanceSet == null) {
+ return Collections.emptySet();
+ }
+ return instanceSet;
+ }
+
@Override
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
@@ -139,12 +284,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
String currentState = stateMap.get(instanceName);
if (instanceConfigMap.containsKey(instanceName)) {
InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
- newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
+ if (extView.isGroupRoutingEnabled()) {
+ newRoutingTable.addEntry(resourceName, extView.getResourceGroupName(),
+ extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
+ } else {
+ newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
+ }
} else {
logger.error("Invalid instance name." + instanceName
+ " .Not found in /cluster/configs/. instanceName: ");
}
-
}
}
}
@@ -153,10 +302,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
}
class RoutingTable {
- private final HashMap<String, ResourceInfo> resourceInfoMap;
+ // mapping a resourceName to the ResourceInfo
+ private final Map<String, ResourceInfo> resourceInfoMap;
+
+ // mapping a resource group name to a resourceGroupInfo
+ private final Map<String, ResourceGroupInfo> resourceGroupInfoMap;
public RoutingTable() {
- resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+ resourceInfoMap = new HashMap<String, ResourceInfo>();
+ resourceGroupInfoMap = new HashMap<String, ResourceGroupInfo>();
}
public void addEntry(String resourceName, String partitionName, String state,
@@ -166,20 +320,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
}
ResourceInfo resourceInfo = resourceInfoMap.get(resourceName);
resourceInfo.addEntry(partitionName, state, config);
+ }
+
+ /**
+ * add an entry with a resource with resourceGrouping enabled.
+ */
+ public void addEntry(String resourceName, String resourceGroupName, String resourceTag,
+ String partitionName, String state, InstanceConfig config) {
+ addEntry(resourceName, partitionName, state, config);
+ if (!resourceGroupInfoMap.containsKey(resourceGroupName)) {
+ resourceGroupInfoMap.put(resourceGroupName, new ResourceGroupInfo());
+ }
+
+ ResourceGroupInfo resourceGroupInfo = resourceGroupInfoMap.get(resourceGroupName);
+ resourceGroupInfo.addEntry(resourceTag, partitionName, state, config);
}
ResourceInfo get(String resourceName) {
return resourceInfoMap.get(resourceName);
}
+ ResourceGroupInfo getResourceGroup(String resourceGroupName) {
+ return resourceGroupInfoMap.get(resourceGroupName);
+ }
}
+ private static Comparator<InstanceConfig> INSTANCE_CONFIG_COMPARATOR =
+ new Comparator<InstanceConfig>() {
+ @Override
+ public int compare(InstanceConfig o1, InstanceConfig o2) {
+ if (o1 == o2) {
+ return 0;
+ }
+ if (o1 == null) {
+ return -1;
+ }
+ if (o2 == null) {
+ return 1;
+ }
+
+ int compareTo = o1.getHostName().compareTo(o2.getHostName());
+ if (compareTo == 0) {
+ return o1.getPort().compareTo(o2.getPort());
+ } else {
+ return compareTo;
+ }
+
+ }
+ };
+
+ /**
+ * Class to store instances, partitions and their states for each resource.
+ */
class ResourceInfo {
// store PartitionInfo for each partition
- HashMap<String, PartitionInfo> partitionInfoMap;
+ Map<String, PartitionInfo> partitionInfoMap;
// stores the Set of Instances in a given state
- HashMap<String, Set<InstanceConfig>> stateInfoMap;
+ Map<String, Set<InstanceConfig>> stateInfoMap;
public ResourceInfo() {
partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>();
@@ -189,30 +387,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
public void addEntry(String stateUnitKey, String state, InstanceConfig config) {
// add
if (!stateInfoMap.containsKey(state)) {
- Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() {
-
- @Override
- public int compare(InstanceConfig o1, InstanceConfig o2) {
- if (o1 == o2) {
- return 0;
- }
- if (o1 == null) {
- return -1;
- }
- if (o2 == null) {
- return 1;
- }
-
- int compareTo = o1.getHostName().compareTo(o2.getHostName());
- if (compareTo == 0) {
- return o1.getPort().compareTo(o2.getPort());
- } else {
- return compareTo;
- }
-
- }
- };
- stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator));
+ stateInfoMap.put(state, new TreeSet<InstanceConfig>(INSTANCE_CONFIG_COMPARATOR));
}
Set<InstanceConfig> set = stateInfoMap.get(state);
set.add(config);
@@ -222,7 +397,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
}
PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey);
stateUnitKeyInfo.addEntry(state, config);
-
}
public Set<InstanceConfig> getInstances(String state) {
@@ -235,8 +409,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
}
}
+ /**
+ * Class to store instances, partitions and their states for each resource group.
+ */
+ class ResourceGroupInfo {
+ // aggregated partitions and instances info for all resources in the resource group.
+ ResourceInfo aggregatedResourceInfo;
+
+ // <ResourceTag, ResourceInfo> maps resource tag to the resource with the tag
+ // in this resource group.
+ // Each ResourceInfo saves only partitions and instances for that resource.
+ Map<String, ResourceInfo> tagToResourceMap;
+
+ public ResourceGroupInfo() {
+ aggregatedResourceInfo = new ResourceInfo();
+ tagToResourceMap = new HashMap<String, ResourceInfo>();
+ }
+
+ public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) {
+ // add the new entry to the aggregated resource info
+ aggregatedResourceInfo.addEntry(stateUnitKey, state, config);
+
+ // add the entry to the resourceInfo with given tag
+ if (!tagToResourceMap.containsKey(resourceTag)) {
+ tagToResourceMap.put(resourceTag, new ResourceInfo());
+ }
+ ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+ resourceInfo.addEntry(stateUnitKey, state, config);
+ }
+
+ public Set<InstanceConfig> getInstances(String state) {
+ return aggregatedResourceInfo.getInstances(state);
+ }
+
+ public Set<InstanceConfig> getInstances(String state, String resourceTag) {
+ ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+ if (resourceInfo != null) {
+ return resourceInfo.getInstances(state);
+ }
+
+ return null;
+ }
+
+ PartitionInfo get(String stateUnitKey) {
+ return aggregatedResourceInfo.get(stateUnitKey);
+ }
+
+ PartitionInfo get(String stateUnitKey, String resourceTag) {
+ ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+ if (resourceInfo == null) {
+ return null;
+ }
+
+ return resourceInfo.get(stateUnitKey);
+ }
+ }
+
class PartitionInfo {
- HashMap<String, List<InstanceConfig>> stateInfoMap;
+ Map<String, List<InstanceConfig>> stateInfoMap;
public PartitionInfo() {
stateInfoMap = new HashMap<String, List<InstanceConfig>>();
@@ -253,5 +483,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
List<InstanceConfig> get(String state) {
return stateInfoMap.get(state);
}
+
+ boolean containsState(String state) {
+ return stateInfoMap.containsKey(state);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 37c4915..9d411bb 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey.Builder;
@@ -57,6 +58,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -220,6 +222,10 @@ public class ClusterSetup {
_admin.addInstance(clusterName, config);
}
+ public void addInstanceTag(String clusterName, String instanceName, String tag) {
+ _admin.addInstanceTag(clusterName, instanceName, tag);
+ }
+
public void dropInstancesFromCluster(String clusterName, String[] instanceInfoArray) {
for (String instanceInfo : instanceInfoArray) {
if (instanceInfo.length() > 0) {
@@ -340,6 +346,10 @@ public class ClusterSetup {
_admin.addStateModelDef(clusterName, stateModelDef, record, overwritePrevious);
}
+ public void addResourceToCluster(String clusterName, String resourceName, IdealState idealState) {
+ _admin.addResource(clusterName, resourceName, idealState);
+ }
+
public void addResourceToCluster(String clusterName, String resourceName, int numResources,
String stateModelRef) {
addResourceToCluster(clusterName, resourceName, numResources, stateModelRef,
@@ -363,6 +373,50 @@ public class ClusterSetup {
bucketSize, maxPartitionsPerInstance);
}
+
+ /**
+ * Get the mangled IdealState name if resourceGroup/resourceTag is enable.
+ */
+ public static String genIdealStateNameWithResourceTag(String resourceName, String resourceTag) {
+ return resourceName + "$" + resourceTag;
+ }
+
+ /**
+ * Create an IdealState for a resource that belongs to a resource group We use
+ * "resourceGroupName$resourceInstanceTag" as the IdealState znode name to differetiate different
+ * resources from the same resourceGroup.
+ */
+ public IdealState createIdealStateForResourceGroup(String resourceGroupName,
+ String resourceTag, int numPartition, int replica, String rebalanceMode, String stateModelDefName) {
+ String idealStateId = genIdealStateNameWithResourceTag(resourceGroupName, resourceTag);
+ IdealState idealState = new IdealState(idealStateId);
+ idealState.setNumPartitions(numPartition);
+ idealState.setStateModelDefRef(stateModelDefName);
+ IdealState.RebalanceMode mode =
+ idealState.rebalanceModeFromString(rebalanceMode, IdealState.RebalanceMode.SEMI_AUTO);
+ idealState.setRebalanceMode(mode);
+ idealState.setReplicas("" + replica);
+ idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ idealState.setResourceGroupName(resourceGroupName);
+ idealState.setInstanceGroupTag(resourceTag);
+ idealState.enableGroupRouting(true);
+
+ return idealState;
+ }
+
+ /**
+ * Enable or disable a resource within a resource group associated with a given resource tag
+ *
+ * @param clusterName
+ * @param resourceName
+ * @param resourceTag
+ */
+ public void enableResource(String clusterName, String resourceName, String resourceTag,
+ boolean enabled) {
+ String idealStateId = genIdealStateNameWithResourceTag(resourceName, resourceTag);
+ _admin.enableResource(clusterName, idealStateId, enabled);
+ }
+
public void dropResourceFromCluster(String clusterName, String resourceName) {
_admin.dropResource(clusterName, resourceName);
}
@@ -448,7 +502,7 @@ public class ClusterSetup {
/**
* set configs
* @param type config-scope type, e.g. CLUSTER, RESOURCE, etc.
- * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
+ * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
* @param keyValuePairs csv-formatted key-value pairs. e.g. k1=v1,k2=v2
*/
public void setConfig(ConfigScopeProperty type, String scopeArgsCsv, String keyValuePairs) {
@@ -463,7 +517,7 @@ public class ClusterSetup {
/**
* remove configs
* @param type config-scope type, e.g. CLUSTER, RESOURCE, etc.
- * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
+ * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
* @param keysCsv csv-formatted keys. e.g. k1,k2
*/
public void removeConfig(ConfigScopeProperty type, String scopeArgsCsv, String keysCsv) {
@@ -616,7 +670,7 @@ public class ClusterSetup {
.create();
listInstancesOption.setArgs(1);
listInstancesOption.setRequired(false);
- listInstancesOption.setArgName("clusterName");
+ listInstancesOption.setArgName("clusterName <-tag tagName>");
Option addClusterOption =
OptionBuilder.withLongOpt(addCluster).withDescription("Add a new cluster").create();
@@ -747,7 +801,8 @@ public class ClusterSetup {
Option enableResourceOption =
OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
- .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+ .hasArgs(3).isRequired(false)
+ .withArgName("clusterName resourceName true/false <-tag resourceTag>")
.create();
Option rebalanceOption =
@@ -1146,9 +1201,17 @@ public class ClusterSetup {
return 0;
} else if (cmd.hasOption(listInstances)) {
String clusterName = cmd.getOptionValue(listInstances);
- List<String> instances =
- setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
+ List<String> instances;
+ if (cmd.hasOption(tag)) {
+ String instanceTag = cmd.getOptionValues(tag)[0];
+ instances = setupTool.getClusterManagementTool()
+ .getInstancesInClusterWithTag(clusterName, instanceTag);
+ } else {
+ instances =
+ setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
+ }
+
System.out.println("Instances in cluster " + clusterName + ":");
for (String instanceName : instances) {
System.out.println(instanceName);
@@ -1251,7 +1314,12 @@ public class ClusterSetup {
String clusterName = cmd.getOptionValues(enableResource)[0];
String resourceName = cmd.getOptionValues(enableResource)[1];
boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
- setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
+ if (cmd.hasOption(tag)) {
+ String resourceTag = cmd.getOptionValues(tag)[0];
+ setupTool.enableResource(clusterName, resourceName, resourceTag, enabled);
+ } else {
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
+ }
} else if (cmd.hasOption(enablePartition)) {
String[] args = cmd.getOptionValues(enablePartition);
http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
new file mode 100644
index 0000000..3466b2f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -0,0 +1,465 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.manager.ZkTestManager;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.participant.DummyProcess;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase {
+
+ protected static final int GROUP_NODE_NR = 5;
+ protected static final int START_PORT = 12918;
+ protected static final String STATE_MODEL = "OnlineOffline";
+ protected static final String TEST_DB = "TestDB";
+ protected static final int PARTITIONS = 20;
+ protected static final int INSTANCE_GROUP_NR = 4;
+ protected static final int TOTAL_NODE_NR = GROUP_NODE_NR * INSTANCE_GROUP_NR;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ protected TestParticipantManager[] _participants = new TestParticipantManager[TOTAL_NODE_NR];
+ protected ClusterControllerManager _controller;
+ protected RoutingTableProvider _routingTableProvider;
+ private HelixAdmin _admin;
+ HelixManager _spectator;
+
+ int _replica = 3;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _admin = new ZKHelixAdmin(_gZkClient);
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ List<String> instanceGroupTags = new ArrayList<String>();
+ for (int i = 0; i < INSTANCE_GROUP_NR; i++) {
+ String groupTag = "cluster_" + i;
+ addInstanceGroup(CLUSTER_NAME, groupTag, GROUP_NODE_NR);
+ instanceGroupTags.add(groupTag);
+ }
+
+ for (String tag : instanceGroupTags) {
+ List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, tag);
+ IdealState idealState =
+ createIdealState(TEST_DB, tag, instances, PARTITIONS, _replica,
+ IdealState.RebalanceMode.CUSTOMIZED.toString());
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, idealState.getResourceName(), idealState);
+ }
+
+ // start dummy participants
+ int i = 0;
+ for (String group : instanceGroupTags) {
+ List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, group);
+ for (String instance : instances) {
+ _participants[i] =
+ new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, TEST_DB, group, instance);
+ _participants[i].syncStart();
+ i++;
+ }
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ // start speculator
+ _routingTableProvider = new RoutingTableProvider();
+ _spectator =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR,
+ ZK_ADDR);
+ _spectator.connect();
+ _spectator.addExternalViewChangeListener(_routingTableProvider);
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // stop participants
+ for (int i = 0; i < TOTAL_NODE_NR; i++) {
+ _participants[i].syncStop();
+ }
+
+ _controller.syncStop();
+ _spectator.disconnect();
+ }
+
+ public IdealState createIdealState(String resourceGroupName, String instanceGroupTag,
+ List<String> instanceNames, int numPartition, int replica, String rebalanceMode) {
+ IdealState is =
+ _gSetupTool.createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag,
+ numPartition, replica, rebalanceMode, "OnlineOffline");
+
+ // setup initial partition->instance mapping.
+ int nodeIdx = 0;
+ int numNode = instanceNames.size();
+ assert (numNode >= replica);
+ for (int i = 0; i < numPartition; i++) {
+ String partitionName = resourceGroupName + "_" + i;
+ for (int j = 0; j < replica; j++) {
+ is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode),
+ OnlineOfflineSMD.States.ONLINE.toString());
+ }
+ nodeIdx++;
+ }
+
+ return is;
+ }
+
+ private void addInstanceGroup(String clusterName, String instanceTag, int numInstance) {
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < numInstance; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + instanceTag + "_" + (START_PORT + i);
+ instances.add(storageNodeName);
+ _gSetupTool.addInstanceToCluster(clusterName, storageNodeName);
+ _gSetupTool.addInstanceTag(clusterName, storageNodeName, instanceTag);
+ }
+ }
+
+ @Test
+ public void testRoutingTable() throws Exception {
+ // Verify routing table works
+ Set<InstanceConfig> allOnlineNodes =
+ _routingTableProvider.getInstancesForResourceGroup(TEST_DB, "ONLINE");
+ Assert.assertEquals(allOnlineNodes.size(), TOTAL_NODE_NR);
+
+ List<InstanceConfig> onlinePartitions =
+ _routingTableProvider.getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE");
+ Assert.assertEquals(onlinePartitions.size(), INSTANCE_GROUP_NR * _replica);
+
+ Set<InstanceConfig> selectedNodes = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2);
+
+ List<InstanceConfig> selectedPartition = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+ Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedPartition.size(), _replica * 2);
+ }
+
+ @Test(dependsOnMethods = { "testRoutingTable" })
+ public void testEnableDisableClusters() throws InterruptedException {
+ // disable a resource
+ _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", false);
+
+ Thread.sleep(500);
+
+ Set<InstanceConfig> selectedNodes = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 1);
+
+ List<InstanceConfig> selectedPartition = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+ Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedPartition.size(), _replica * 1);
+
+ // enable a resource
+ _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", true);
+ Thread.sleep(500);
+
+ selectedNodes = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2);
+
+ selectedPartition = _routingTableProvider
+ .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+ Arrays.asList("cluster_2", "cluster_3"));
+ Assert.assertEquals(selectedPartition.size(), _replica * 2);
+ }
+
+ public static class MockProcess {
+ private static final Logger logger = Logger.getLogger(DummyProcess.class);
+ // public static final String rootNamespace = "rootNamespace";
+
+ private final String _zkConnectString;
+ private final String _clusterName;
+ private final String _instanceName;
+ private final String _resourceName;
+ private final String _resourceTag;
+ // private StateMachineEngine genericStateMachineHandler;
+
+ private int _transDelayInMs = 0;
+ private final String _clusterMangerType;
+
+ public MockProcess(String zkConnectString, String clusterName, String resourceName,
+ String instanceName, String resourceTag,
+ String clusterMangerType, int delay) {
+ _zkConnectString = zkConnectString;
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ _resourceTag = resourceTag;
+ _instanceName = instanceName;
+ _clusterMangerType = clusterMangerType;
+ _transDelayInMs = delay > 0 ? delay : 0;
+ }
+
+ static void sleep(long transDelay) {
+ try {
+ if (transDelay > 0) {
+ Thread.sleep(transDelay);
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public HelixManager start() throws Exception {
+ HelixManager manager = null;
+ // zk cluster manager
+ if (_clusterMangerType.equalsIgnoreCase("zk")) {
+ manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+ InstanceType.PARTICIPANT, _zkConnectString);
+ } else {
+ throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
+ }
+
+ MockOnlineOfflineStateModelFactory stateModelFactory2 =
+ new MockOnlineOfflineStateModelFactory(_transDelayInMs, _resourceName, _resourceTag,
+ _instanceName);
+ // genericStateMachineHandler = new StateMachineEngine();
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+
+ manager.connect();
+ //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+ return manager;
+ }
+
+ public static class MockOnlineOfflineStateModelFactory extends
+ StateModelFactory<MockOnlineOfflineStateModel> {
+ int _delay;
+ String _instanceName;
+ String _resourceName;
+ String _resourceTag;
+
+ public MockOnlineOfflineStateModelFactory(int delay, String resourceName, String resourceTag,
+ String instanceName) {
+ _delay = delay;
+ _instanceName = instanceName;
+ _resourceName = resourceName;
+ _resourceTag = resourceTag;
+ }
+
+ @Override
+ public MockOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ MockOnlineOfflineStateModel model = new MockOnlineOfflineStateModel();
+ model.setDelay(_delay);
+ model.setInstanceName(_instanceName);
+ model.setResourceName(_resourceName);
+ model.setResourceTag(_resourceTag);
+ return model;
+ }
+ }
+
+ public static class MockOnlineOfflineStateModel extends StateModel {
+ int _transDelay = 0;
+ String _instanceName;
+ String _resourceName;
+ String _resourceTag;
+
+ public void setDelay(int delay) {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void setInstanceName(String instanceName) {_instanceName = instanceName;}
+
+ public void setResourceTag(String resourceTag) {
+ _resourceTag = resourceTag;
+ }
+
+ public void setResourceName(String resourceName) {
+ _resourceName = resourceName;
+ }
+
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ String db = message.getPartitionName();
+ String instanceName = context.getManager().getInstanceName();
+ MockProcess.sleep(_transDelay);
+
+ logger.info("MockStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName + ", db:"
+ + db);
+
+ logger.info(
+ "MockStateModel.onBecomeOnlineFromOffline(), resource " + message.getResourceName()
+ + ", partition"
+ + message.getPartitionName());
+
+ verifyMessage(message);
+ }
+
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ MockProcess.sleep(_transDelay);
+
+ logger.info(
+ "MockStateModel.onBecomeOfflineFromOnline(), resource " + message.getResourceName()
+ + ", partition"
+ + message.getPartitionName() + ", targetName: " + message.getTgtName());
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ verifyMessage(message);
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ MockProcess.sleep(_transDelay);
+
+ logger.info(
+ "MockStateModel.onBecomeDroppedFromOffline(), resource " + message.getResourceName()
+ + ", partition"
+ + message.getPartitionName());
+
+ verifyMessage(message);
+ }
+
+ private void verifyMessage(Message message) {
+ assert _instanceName.equals(message.getTgtName());
+ assert _resourceName.equals(message.getResourceGroupName());
+ assert _resourceTag.equals(message.getResourceTag());
+ }
+ }
+ }
+
+ public static class TestParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
+ private static Logger LOG = Logger.getLogger(TestParticipantManager.class);
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
+
+ private String _instanceGroup;
+ private String _resourceName;
+
+ public TestParticipantManager(String zkAddr, String clusterName, String resourceName,
+ String instanceGroup, String instanceName) {
+ super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+ _instanceGroup = instanceGroup;
+ _resourceName = resourceName;
+ }
+
+ public void syncStop() {
+ _stopCountDown.countDown();
+ try {
+ _waitStopCompleteCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("exception in syncStop participant-manager", e);
+ }
+ }
+
+ public void syncStart() {
+ try {
+ new Thread(this).start();
+ _startCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("exception in syncStart participant-manager", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ StateMachineEngine stateMach = getStateMachineEngine();
+ MockProcess.MockOnlineOfflineStateModelFactory
+ ofModelFactory =
+ new MockProcess.MockOnlineOfflineStateModelFactory(10, _resourceName, _instanceGroup,
+ getInstanceName());
+ stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+ connect();
+ _startCountDown.countDown();
+
+ _stopCountDown.await();
+ } catch (InterruptedException e) {
+ String msg =
+ "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
+ + " is interrupted";
+ LOG.info(msg);
+ } catch (Exception e) {
+ LOG.error("exception running participant-manager", e);
+ } finally {
+ _startCountDown.countDown();
+
+ disconnect();
+ _waitStopCompleteCountDown.countDown();
+ }
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return _zkclient;
+ }
+
+ @Override
+ public List<CallbackHandler> getHandlers() {
+ return _handlers;
+ }
+ }
+}