You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/07/16 23:31:23 UTC

helix git commit: [HELIX-599] Support creating/maintaining/routing resources with same names in different instance groups.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 2775e1566 -> a23beb7cf


[HELIX-599] Support creating/maintaining/routing resources with same names in different instance groups.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a23beb7c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a23beb7c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a23beb7c

Branch: refs/heads/helix-0.6.x
Commit: a23beb7cf79a3f1da104a55477c7eddb594fa68b
Parents: 2775e15
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon May 11 10:54:27 2015 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Jul 9 22:37:54 2015 -0700

----------------------------------------------------------------------
 .../stages/MessageGenerationPhase.java          |  23 +-
 .../stages/ResourceComputationStage.java        |   8 +
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |   7 +
 .../org/apache/helix/model/ExternalView.java    |  36 ++
 .../java/org/apache/helix/model/IdealState.java |  45 +-
 .../java/org/apache/helix/model/Message.java    |  38 ++
 .../java/org/apache/helix/model/Resource.java   |  35 ++
 .../helix/spectator/RoutingTableProvider.java   | 304 ++++++++++--
 .../org/apache/helix/tools/ClusterSetup.java    |  82 +++-
 .../integration/TestResourceGroupEndtoEnd.java  | 465 +++++++++++++++++++
 10 files changed, 989 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index bc3c739..2e919f8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -70,7 +70,6 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
     for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
-      int bucketSize = resource.getBucketSize();
 
       StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
 
@@ -125,9 +124,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
           } else {
 
             Message message =
-                createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
-                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
-                    resource.getStateModelFactoryname(), bucketSize);
+                createMessage(manager, resource, partition.getPartitionName(), instanceName,
+                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId());
 
             IdealState idealState = cache.getIdealState(resourceName);
             if (idealState != null
@@ -188,23 +186,30 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
   }
 
-  private Message createMessage(HelixManager manager, String resourceName, String partitionName,
+  private Message createMessage(HelixManager manager, Resource resource, String partitionName,
       String instanceName, String currentState, String nextState, String sessionId,
-      String stateModelDefName, String stateModelFactoryName, int bucketSize) {
+      String stateModelDefName) {
     String uuid = UUID.randomUUID().toString();
     Message message = new Message(MessageType.STATE_TRANSITION, uuid);
     message.setSrcName(manager.getInstanceName());
     message.setTgtName(instanceName);
     message.setMsgState(MessageState.NEW);
     message.setPartitionName(partitionName);
-    message.setResourceName(resourceName);
+    message.setResourceName(resource.getResourceName());
     message.setFromState(currentState);
     message.setToState(nextState);
     message.setTgtSessionId(sessionId);
     message.setSrcSessionId(manager.getSessionId());
     message.setStateModelDef(stateModelDefName);
-    message.setStateModelFactoryName(stateModelFactoryName);
-    message.setBucketSize(bucketSize);
+    message.setStateModelFactoryName(resource.getStateModelFactoryname());
+    message.setBucketSize(resource.getBucketSize());
+
+    if (resource.getResourceGroupName() != null) {
+      message.setResourceGroupName(resource.getResourceGroupName());
+    }
+    if (resource.getResourceTag() != null) {
+      message.setResourceTag(resource.getResourceTag());
+    }
 
     return message;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5676098..bde2904 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -62,6 +62,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
           resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
           resource.setBucketSize(idealState.getBucketSize());
           resource.setBatchMessageMode(idealState.getBatchMessageMode());
+          resource.setResourceGroupName(idealState.getResourceGroupName());
+          resource.setResourceTag(idealState.getInstanceGroupTag());
         }
 
         for (String partition : partitionSet) {
@@ -102,6 +104,12 @@ public class ResourceComputationStage extends AbstractBaseStage {
             resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
             resource.setBucketSize(currentState.getBucketSize());
             resource.setBatchMessageMode(currentState.getBatchMessageMode());
+
+            IdealState idealState = idealStates.get(resourceName);
+            if (idealState != null) {
+              resource.setResourceGroupName(idealState.getResourceGroupName());
+              resource.setResourceTag(idealState.getInstanceGroupTag());
+            }
           }
 
           if (currentState.getStateModelDefRef() == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ecf84f8..e97ac9b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -404,6 +404,13 @@ public class ZKHelixAdmin implements HelixAdmin {
       message.setToState(stateModel.getInitialState());
       message.setStateModelFactoryName(idealState.getStateModelFactoryName());
 
+      if (idealState.getResourceGroupName() != null) {
+        message.setResourceGroupName(idealState.getResourceGroupName());
+      }
+      if (idealState.getInstanceGroupTag() != null) {
+        message.setResourceTag(idealState.getInstanceGroupTag());
+      }
+
       resetMessages.add(message);
       messageKeys.add(keyBuilder.message(instanceName, message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index d5f1afc..7b201b0 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -31,6 +31,16 @@ import org.apache.helix.ZNRecord;
  * of current states for the partitions in a resource
  */
 public class ExternalView extends HelixProperty {
+
+  /**
+   * Properties that are persisted and are queryable for an external view
+   */
+  public enum ExternalViewProperty {
+    INSTANCE_GROUP_TAG,
+    RESOURCE_GROUP_NAME,
+    GROUP_ROUTING_ENABLED
+  }
+
   /**
    * Instantiate an external view with the resource it corresponds to
    * @param resource the name of the resource
@@ -95,6 +105,32 @@ public class ExternalView extends HelixProperty {
     return _record.getId();
   }
 
+  /**
+   * Get the resource group name
+   *
+   * @return the name of the resource group this resource belongs to.
+   */
+  public String getResourceGroupName() {
+    return _record.getSimpleField(ExternalViewProperty.RESOURCE_GROUP_NAME.toString());
+  }
+
+  /**
+   * Check whether the group routing is enabled for this resource.
+   *
+   * @return true if the group routing enabled for this resource; false otherwise
+   */
+  public boolean isGroupRoutingEnabled() {
+    return _record.getBooleanField(ExternalViewProperty.GROUP_ROUTING_ENABLED.name(), false);
+  }
+
+  /**
+   * Check for a group tag of this resource
+   * @return the group tag, or null if none is present
+   */
+  public String getInstanceGroupTag() {
+    return _record.getSimpleField(ExternalViewProperty.INSTANCE_GROUP_TAG.toString());
+  }
+
   @Override
   public boolean isValid() {
     return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index bc31e1e..d2744ac 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -55,7 +55,9 @@ public class IdealState extends HelixProperty {
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
     REBALANCER_CLASS_NAME,
-    HELIX_ENABLED
+    HELIX_ENABLED,
+    RESOURCE_GROUP_NAME,
+    GROUP_ROUTING_ENABLED
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -113,7 +115,7 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Get the rebalance mode of the ideal state
+   * Set the rebalance mode of the ideal state
    * @param mode {@link IdealStateModeProperty}
    */
   @Deprecated
@@ -124,7 +126,7 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Get the rebalance mode of the resource
+   * Set the rebalance mode of the resource
    * @param rebalancerType
    */
   public void setRebalanceMode(RebalanceMode rebalancerType) {
@@ -160,6 +162,43 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the resource group name
+   * @param resourceGroupName
+   */
+  public void setResourceGroupName(String resourceGroupName) {
+    _record.setSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString(), resourceGroupName);
+  }
+
+  /**
+   * Get the resource group name
+   *
+   * @return
+   */
+  public String getResourceGroupName() {
+    return _record.getSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString());
+  }
+
+  /**
+   * Get if the resource group routing feature is enabled or not
+   * By default, it's disabled
+   *
+   * @return true if enabled; false otherwise
+   */
+  public boolean isResourceGroupEnabled() {
+    return _record.getBooleanField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(), false);
+  }
+
+  /**
+   * Enable/Disable the aggregated routing on resource group.
+   *
+   * @param enabled
+   */
+  public void enableGroupRouting(boolean enabled) {
+    _record.setSimpleField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(),
+        Boolean.toString(enabled));
+  }
+
+  /**
    * Set the maximum number of partitions of this resource that an instance can serve
    * @param max the maximum number of partitions supported
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 937a28e..9fed87b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -64,6 +64,8 @@ public class Message extends HelixProperty {
     MSG_STATE,
     PARTITION_NAME,
     RESOURCE_NAME,
+    RESOURCE_GROUP_NAME,
+    RESOURCE_TAG,
     FROM_STATE,
     TO_STATE,
     STATE_MODEL_DEF,
@@ -397,6 +399,42 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Set the resource group associated with this message
+   *
+   * @param resourceGroupName resource group name to set
+   */
+  public void setResourceGroupName(String resourceGroupName) {
+    _record.setSimpleField(Attributes.RESOURCE_GROUP_NAME.toString(), resourceGroupName);
+  }
+
+  /**
+   * Get the resource group name associated with this message
+   *
+   * @return resource group name
+   */
+  public String getResourceGroupName() {
+    return _record.getSimpleField(Attributes.RESOURCE_GROUP_NAME.toString());
+  }
+
+  /**
+   * Set the resource tag associated with this message
+   *
+   * @param resourceTag resource tag to set
+   */
+  public void setResourceTag(String resourceTag) {
+    _record.setSimpleField(Attributes.RESOURCE_TAG.toString(), resourceTag);
+  }
+
+  /**
+   * Get the resource tag associated with this message
+   *
+   * @return resource tag
+   */
+  public String getResourceTag() {
+    return _record.getSimpleField(Attributes.RESOURCE_TAG.toString());
+  }
+
+  /**
    * Get the resource partition associated with this message
    * @return partition name
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java
index 1544514..7a22686 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -38,6 +38,8 @@ public class Resource {
   private String _stateModelFactoryName;
   private int _bucketSize = 0;
   private boolean _batchMessageMode = false;
+  private String _resourceGroupName;
+  private String _resourceTag;
 
   /**
    * Instantiate a resource by its name
@@ -149,6 +151,39 @@ public class Resource {
     return _batchMessageMode;
   }
 
+  /**
+   * Get the resource tag assigned to this resource
+   *
+   * @return the name of the tag
+   */
+  public String getResourceTag() {
+    return _resourceTag;
+  }
+
+  /**
+   * Set the resource tag
+   * @param resourceTag
+   */
+  public void setResourceTag(String resourceTag) {
+    _resourceTag = resourceTag;
+  }
+
+  /**
+   * Get resource group name
+   * @return the resource group name
+   */
+  public String getResourceGroupName() {
+    return _resourceGroupName;
+  }
+
+  /**
+   * Set resource group name
+   * @param resourceGroupName
+   */
+  public void setResourceGroupName(String resourceGroupName) {
+    _resourceGroupName = resourceGroupName;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 9bba660..bd2b44d 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,6 +51,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
   /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
+   *
+   * This method will be deprecated, please use the
+   * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
    * @param resourceName
    *          -
    * @param partitionName
@@ -57,6 +61,19 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state) {
+    return getInstancesForResource(resourceName, partitionName, state);
+  }
+
+  /**
+   * returns the instances for {resource,partition} pair that are in a specific
+   * {state}
+   * @param resourceName
+   *          -
+   * @param partitionName
+   * @param state
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, String state) {
     List<InstanceConfig> instanceList = null;
     RoutingTable _routingTable = _routingTableRef.get();
     ResourceInfo resourceInfo = _routingTable.get(resourceName);
@@ -73,15 +90,93 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
   }
 
   /**
+   * returns the instances for {resource group,partition} pair in all resources belongs to the given
+   * resource group that are in a specific {state}.
+   *
+   * The return results aggregate all partition states from all the resources in the given resource
+   * group.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state) {
+    List<InstanceConfig> instanceList = null;
+    RoutingTable _routingTable = _routingTableRef.get();
+    ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+    if (resourceGroupInfo != null) {
+      PartitionInfo keyInfo = resourceGroupInfo.get(partitionName);
+      if (keyInfo != null) {
+        instanceList = keyInfo.get(state);
+      }
+    }
+    if (instanceList == null) {
+      instanceList = Collections.emptyList();
+    }
+    return instanceList;
+  }
+
+  /**
+   * returns the instances for {resource group,partition} pair contains any of the given tags
+   * that are in a specific {state}.
+   *
+   * Find all resources belongs to the given resource group that have any of the given resource tags
+   * and return the aggregated partition states from all these resources.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   * @param resourceTags
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName,
+      String state, List<String> resourceTags) {
+    RoutingTable _routingTable = _routingTableRef.get();
+    ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+    List<InstanceConfig> instanceList = null;
+    if (resourceGroupInfo != null) {
+      instanceList = new ArrayList<InstanceConfig>();
+      for (String tag : resourceTags) {
+        PartitionInfo keyInfo = resourceGroupInfo.get(partitionName, tag);
+        if (keyInfo != null && keyInfo.containsState(state)) {
+          instanceList.addAll(keyInfo.get(state));
+        }
+      }
+    }
+    if (instanceList == null) {
+      return Collections.emptyList();
+    }
+
+    return instanceList;
+  }
+
+  /**
    * returns all instances for {resource} that are in a specific {state}
-   * @param resource
+   *
+   * This method will be deprecated, please use the
+   * {@link #getInstancesForResource(String, String) getInstancesForResource} method.
+   * @param resourceName
    * @param state
    * @return empty list if there is no instance in a given state
    */
-  public Set<InstanceConfig> getInstances(String resource, String state) {
+  public Set<InstanceConfig> getInstances(String resourceName, String state) {
+    return getInstancesForResource(resourceName, state);
+  }
+
+  /**
+   * returns all instances for {resource} that are in a specific {state}.
+   * @param resourceName
+   * @param state
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
     Set<InstanceConfig> instanceSet = null;
     RoutingTable routingTable = _routingTableRef.get();
-    ResourceInfo resourceInfo = routingTable.get(resource);
+    ResourceInfo resourceInfo = routingTable.get(resourceName);
     if (resourceInfo != null) {
       instanceSet = resourceInfo.getInstances(state);
     }
@@ -91,6 +186,56 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
     return instanceSet;
   }
 
+  /**
+   * returns all instances for all resources in {resource group} that are in a specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
+    Set<InstanceConfig> instanceSet = null;
+    RoutingTable _routingTable = _routingTableRef.get();
+    ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+    if (resourceGroupInfo != null) {
+      instanceSet = resourceGroupInfo.getInstances(state);
+    }
+    if (instanceSet == null) {
+      instanceSet = Collections.emptySet();
+    }
+    return instanceSet;
+  }
+
+  /**
+   * returns all instances for resources contains any given tags in {resource group} that are in a
+   * specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
+      List<String> resourceTags) {
+    Set<InstanceConfig> instanceSet = null;
+    RoutingTable _routingTable = _routingTableRef.get();
+    ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName);
+    if (resourceGroupInfo != null) {
+      instanceSet = new HashSet<InstanceConfig>();
+      for (String tag : resourceTags) {
+        Set<InstanceConfig> instances = resourceGroupInfo.getInstances(state, tag);
+        if (instances != null) {
+          instanceSet.addAll(resourceGroupInfo.getInstances(state, tag));
+        }
+      }
+    }
+    if (instanceSet == null) {
+      return Collections.emptySet();
+    }
+    return instanceSet;
+  }
+
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
@@ -139,12 +284,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
             String currentState = stateMap.get(instanceName);
             if (instanceConfigMap.containsKey(instanceName)) {
               InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
-              newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
+              if (extView.isGroupRoutingEnabled()) {
+                newRoutingTable.addEntry(resourceName, extView.getResourceGroupName(),
+                    extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
+              } else {
+                newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
+              }
             } else {
               logger.error("Invalid instance name." + instanceName
                   + " .Not found in /cluster/configs/. instanceName: ");
             }
-
           }
         }
       }
@@ -153,10 +302,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
   }
 
   class RoutingTable {
-    private final HashMap<String, ResourceInfo> resourceInfoMap;
+    // mapping a resourceName to the ResourceInfo
+    private final Map<String, ResourceInfo> resourceInfoMap;
+
+    // mapping a resource group name to a resourceGroupInfo
+    private final Map<String, ResourceGroupInfo> resourceGroupInfoMap;
 
     public RoutingTable() {
-      resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+      resourceInfoMap = new HashMap<String, ResourceInfo>();
+      resourceGroupInfoMap = new HashMap<String, ResourceGroupInfo>();
     }
 
     public void addEntry(String resourceName, String partitionName, String state,
@@ -166,20 +320,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
       }
       ResourceInfo resourceInfo = resourceInfoMap.get(resourceName);
       resourceInfo.addEntry(partitionName, state, config);
+    }
+
+    /**
+     * add an entry with a resource with resourceGrouping enabled.
+     */
+    public void addEntry(String resourceName, String resourceGroupName, String resourceTag,
+        String partitionName, String state, InstanceConfig config) {
+      addEntry(resourceName, partitionName, state, config);
 
+      if (!resourceGroupInfoMap.containsKey(resourceGroupName)) {
+        resourceGroupInfoMap.put(resourceGroupName, new ResourceGroupInfo());
+      }
+
+      ResourceGroupInfo resourceGroupInfo = resourceGroupInfoMap.get(resourceGroupName);
+      resourceGroupInfo.addEntry(resourceTag, partitionName, state, config);
     }
 
     ResourceInfo get(String resourceName) {
       return resourceInfoMap.get(resourceName);
     }
 
+    ResourceGroupInfo getResourceGroup(String resourceGroupName) {
+      return resourceGroupInfoMap.get(resourceGroupName);
+    }
   }
 
+  private static Comparator<InstanceConfig> INSTANCE_CONFIG_COMPARATOR =
+      new Comparator<InstanceConfig>() {
+        @Override
+        public int compare(InstanceConfig o1, InstanceConfig o2) {
+          if (o1 == o2) {
+            return 0;
+          }
+          if (o1 == null) {
+            return -1;
+          }
+          if (o2 == null) {
+            return 1;
+          }
+
+          int compareTo = o1.getHostName().compareTo(o2.getHostName());
+          if (compareTo == 0) {
+            return o1.getPort().compareTo(o2.getPort());
+          } else {
+            return compareTo;
+          }
+
+        }
+      };
+
+  /**
+   * Class to store instances, partitions and their states for each resource.
+   */
   class ResourceInfo {
     // store PartitionInfo for each partition
-    HashMap<String, PartitionInfo> partitionInfoMap;
+    Map<String, PartitionInfo> partitionInfoMap;
     // stores the Set of Instances in a given state
-    HashMap<String, Set<InstanceConfig>> stateInfoMap;
+    Map<String, Set<InstanceConfig>> stateInfoMap;
 
     public ResourceInfo() {
       partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>();
@@ -189,30 +387,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
     public void addEntry(String stateUnitKey, String state, InstanceConfig config) {
       // add
       if (!stateInfoMap.containsKey(state)) {
-        Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() {
-
-          @Override
-          public int compare(InstanceConfig o1, InstanceConfig o2) {
-            if (o1 == o2) {
-              return 0;
-            }
-            if (o1 == null) {
-              return -1;
-            }
-            if (o2 == null) {
-              return 1;
-            }
-
-            int compareTo = o1.getHostName().compareTo(o2.getHostName());
-            if (compareTo == 0) {
-              return o1.getPort().compareTo(o2.getPort());
-            } else {
-              return compareTo;
-            }
-
-          }
-        };
-        stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator));
+        stateInfoMap.put(state, new TreeSet<InstanceConfig>(INSTANCE_CONFIG_COMPARATOR));
       }
       Set<InstanceConfig> set = stateInfoMap.get(state);
       set.add(config);
@@ -222,7 +397,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
       }
       PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey);
       stateUnitKeyInfo.addEntry(state, config);
-
     }
 
     public Set<InstanceConfig> getInstances(String state) {
@@ -235,8 +409,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
     }
   }
 
+  /**
+   * Class to store instances, partitions and their states for each resource group.
+   */
+  class ResourceGroupInfo {
+    // aggregated partitions and instances info for all resources in the resource group.
+    ResourceInfo aggregatedResourceInfo;
+
+    // <ResourceTag, ResourceInfo> maps resource tag to the resource with the tag
+    // in this resource group.
+    // Each ResourceInfo saves only partitions and instances for that resource.
+    Map<String, ResourceInfo> tagToResourceMap;
+
+    public ResourceGroupInfo() {
+      aggregatedResourceInfo = new ResourceInfo();
+      tagToResourceMap = new HashMap<String, ResourceInfo>();
+    }
+
+    public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) {
+      // add the new entry to the aggregated resource info
+      aggregatedResourceInfo.addEntry(stateUnitKey, state, config);
+
+      // add the entry to the resourceInfo with given tag
+      if (!tagToResourceMap.containsKey(resourceTag)) {
+        tagToResourceMap.put(resourceTag, new ResourceInfo());
+      }
+      ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+      resourceInfo.addEntry(stateUnitKey, state, config);
+    }
+
+    public Set<InstanceConfig> getInstances(String state) {
+      return aggregatedResourceInfo.getInstances(state);
+    }
+
+    public Set<InstanceConfig> getInstances(String state, String resourceTag) {
+      ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+      if (resourceInfo != null) {
+        return resourceInfo.getInstances(state);
+      }
+
+      return null;
+    }
+
+    PartitionInfo get(String stateUnitKey) {
+      return aggregatedResourceInfo.get(stateUnitKey);
+    }
+
+    PartitionInfo get(String stateUnitKey, String resourceTag) {
+      ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag);
+      if (resourceInfo == null) {
+        return null;
+      }
+
+      return resourceInfo.get(stateUnitKey);
+    }
+  }
+
   class PartitionInfo {
-    HashMap<String, List<InstanceConfig>> stateInfoMap;
+    Map<String, List<InstanceConfig>> stateInfoMap;
 
     public PartitionInfo() {
       stateInfoMap = new HashMap<String, List<InstanceConfig>>();
@@ -253,5 +483,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC
     List<InstanceConfig> get(String state) {
       return stateInfoMap.get(state);
     }
+
+    boolean containsState(String state) {
+      return stateInfoMap.containsKey(state);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 37c4915..9d411bb 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey.Builder;
@@ -57,6 +58,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -220,6 +222,10 @@ public class ClusterSetup {
     _admin.addInstance(clusterName, config);
   }
 
+  public void addInstanceTag(String clusterName, String instanceName, String tag) {
+    _admin.addInstanceTag(clusterName, instanceName, tag);
+  }
+
   public void dropInstancesFromCluster(String clusterName, String[] instanceInfoArray) {
     for (String instanceInfo : instanceInfoArray) {
       if (instanceInfo.length() > 0) {
@@ -340,6 +346,10 @@ public class ClusterSetup {
     _admin.addStateModelDef(clusterName, stateModelDef, record, overwritePrevious);
   }
 
+  public void addResourceToCluster(String clusterName, String resourceName, IdealState idealState) {
+    _admin.addResource(clusterName, resourceName, idealState);
+  }
+
   public void addResourceToCluster(String clusterName, String resourceName, int numResources,
       String stateModelRef) {
     addResourceToCluster(clusterName, resourceName, numResources, stateModelRef,
@@ -363,6 +373,50 @@ public class ClusterSetup {
         bucketSize, maxPartitionsPerInstance);
   }
 
+
+  /**
+   * Get the mangled IdealState name if resourceGroup/resourceTag is enable.
+   */
+  public static String genIdealStateNameWithResourceTag(String resourceName, String resourceTag) {
+    return resourceName + "$" + resourceTag;
+  }
+
+  /**
+   * Create an IdealState for a resource that belongs to a resource group We use
+   * "resourceGroupName$resourceInstanceTag" as the IdealState znode name to differetiate different
+   * resources from the same resourceGroup.
+   */
+  public IdealState createIdealStateForResourceGroup(String resourceGroupName,
+      String resourceTag, int numPartition, int replica, String rebalanceMode, String stateModelDefName) {
+    String idealStateId = genIdealStateNameWithResourceTag(resourceGroupName, resourceTag);
+    IdealState idealState = new IdealState(idealStateId);
+    idealState.setNumPartitions(numPartition);
+    idealState.setStateModelDefRef(stateModelDefName);
+    IdealState.RebalanceMode mode =
+        idealState.rebalanceModeFromString(rebalanceMode, IdealState.RebalanceMode.SEMI_AUTO);
+    idealState.setRebalanceMode(mode);
+    idealState.setReplicas("" + replica);
+    idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    idealState.setResourceGroupName(resourceGroupName);
+    idealState.setInstanceGroupTag(resourceTag);
+    idealState.enableGroupRouting(true);
+
+    return idealState;
+  }
+
+  /**
+   * Enable or disable a resource within a resource group associated with a given resource tag
+   *
+   * @param clusterName
+   * @param resourceName
+   * @param resourceTag
+   */
+  public void enableResource(String clusterName, String resourceName, String resourceTag,
+      boolean enabled) {
+    String idealStateId = genIdealStateNameWithResourceTag(resourceName, resourceTag);
+    _admin.enableResource(clusterName, idealStateId, enabled);
+  }
+
   public void dropResourceFromCluster(String clusterName, String resourceName) {
     _admin.dropResource(clusterName, resourceName);
   }
@@ -448,7 +502,7 @@ public class ClusterSetup {
   /**
    * set configs
    * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc.
-   * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
+   * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
    * @param keyValuePairs csv-formatted key-value pairs. e.g. k1=v1,k2=v2
    */
   public void setConfig(ConfigScopeProperty type, String scopeArgsCsv, String keyValuePairs) {
@@ -463,7 +517,7 @@ public class ClusterSetup {
   /**
    * remove configs
    * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc.
-   * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
+   * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB
    * @param keysCsv csv-formatted keys. e.g. k1,k2
    */
   public void removeConfig(ConfigScopeProperty type, String scopeArgsCsv, String keysCsv) {
@@ -616,7 +670,7 @@ public class ClusterSetup {
             .create();
     listInstancesOption.setArgs(1);
     listInstancesOption.setRequired(false);
-    listInstancesOption.setArgName("clusterName");
+    listInstancesOption.setArgName("clusterName <-tag tagName>");
 
     Option addClusterOption =
         OptionBuilder.withLongOpt(addCluster).withDescription("Add a new cluster").create();
@@ -747,7 +801,8 @@ public class ClusterSetup {
 
     Option enableResourceOption =
         OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
-            .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+            .hasArgs(3).isRequired(false)
+            .withArgName("clusterName resourceName true/false <-tag resourceTag>")
             .create();
 
     Option rebalanceOption =
@@ -1146,9 +1201,17 @@ public class ClusterSetup {
       return 0;
     } else if (cmd.hasOption(listInstances)) {
       String clusterName = cmd.getOptionValue(listInstances);
-      List<String> instances =
-          setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
 
+      List<String> instances;
+      if (cmd.hasOption(tag)) {
+        String instanceTag = cmd.getOptionValues(tag)[0];
+        instances = setupTool.getClusterManagementTool()
+            .getInstancesInClusterWithTag(clusterName, instanceTag);
+      } else {
+        instances =
+            setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
+      }
+      
       System.out.println("Instances in cluster " + clusterName + ":");
       for (String instanceName : instances) {
         System.out.println(instanceName);
@@ -1251,7 +1314,12 @@ public class ClusterSetup {
       String clusterName = cmd.getOptionValues(enableResource)[0];
       String resourceName = cmd.getOptionValues(enableResource)[1];
       boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
-      setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
+      if (cmd.hasOption(tag)) {
+        String resourceTag = cmd.getOptionValues(tag)[0];
+        setupTool.enableResource(clusterName, resourceName, resourceTag, enabled);
+      } else {
+        setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
+      }
     } else if (cmd.hasOption(enablePartition)) {
       String[] args = cmd.getOptionValues(enablePartition);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
new file mode 100644
index 0000000..3466b2f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -0,0 +1,465 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.manager.ZkTestManager;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.participant.DummyProcess;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase {
+
+  protected static final int GROUP_NODE_NR = 5;
+  protected static final int START_PORT = 12918;
+  protected static final String STATE_MODEL = "OnlineOffline";
+  protected static final String TEST_DB = "TestDB";
+  protected static final int PARTITIONS = 20;
+  protected static final int INSTANCE_GROUP_NR = 4;
+  protected static final int TOTAL_NODE_NR = GROUP_NODE_NR * INSTANCE_GROUP_NR;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected TestParticipantManager[] _participants = new TestParticipantManager[TOTAL_NODE_NR];
+  protected ClusterControllerManager _controller;
+  protected RoutingTableProvider _routingTableProvider;
+  private HelixAdmin _admin;
+  HelixManager _spectator;
+
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _admin = new ZKHelixAdmin(_gZkClient);
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    List<String> instanceGroupTags = new ArrayList<String>();
+    for (int i = 0; i < INSTANCE_GROUP_NR; i++) {
+      String groupTag = "cluster_" + i;
+      addInstanceGroup(CLUSTER_NAME, groupTag, GROUP_NODE_NR);
+      instanceGroupTags.add(groupTag);
+    }
+
+    for (String tag : instanceGroupTags) {
+      List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, tag);
+      IdealState idealState =
+          createIdealState(TEST_DB, tag, instances, PARTITIONS, _replica,
+              IdealState.RebalanceMode.CUSTOMIZED.toString());
+      _gSetupTool.addResourceToCluster(CLUSTER_NAME, idealState.getResourceName(), idealState);
+    }
+
+    // start dummy participants
+    int i = 0;
+    for (String group : instanceGroupTags) {
+      List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, group);
+      for (String instance : instances) {
+        _participants[i] =
+            new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, TEST_DB, group, instance);
+        _participants[i].syncStart();
+        i++;
+      }
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(
+            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    // start speculator
+    _routingTableProvider = new RoutingTableProvider();
+    _spectator =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR,
+            ZK_ADDR);
+    _spectator.connect();
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
+    Thread.sleep(1000);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // stop participants
+    for (int i = 0; i < TOTAL_NODE_NR; i++) {
+      _participants[i].syncStop();
+    }
+
+    _controller.syncStop();
+    _spectator.disconnect();
+  }
+
+  public IdealState createIdealState(String resourceGroupName, String instanceGroupTag,
+      List<String> instanceNames, int numPartition, int replica, String rebalanceMode) {
+    IdealState is =
+        _gSetupTool.createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag,
+            numPartition, replica, rebalanceMode, "OnlineOffline");
+
+    // setup initial partition->instance mapping.
+    int nodeIdx = 0;
+    int numNode = instanceNames.size();
+    assert (numNode >= replica);
+    for (int i = 0; i < numPartition; i++) {
+      String partitionName = resourceGroupName + "_" + i;
+      for (int j = 0; j < replica; j++) {
+        is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode),
+            OnlineOfflineSMD.States.ONLINE.toString());
+      }
+      nodeIdx++;
+    }
+
+    return is;
+  }
+
+  private void addInstanceGroup(String clusterName, String instanceTag, int numInstance) {
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < numInstance; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + instanceTag + "_" + (START_PORT + i);
+      instances.add(storageNodeName);
+      _gSetupTool.addInstanceToCluster(clusterName, storageNodeName);
+      _gSetupTool.addInstanceTag(clusterName, storageNodeName, instanceTag);
+    }
+  }
+
+  @Test
+  public void testRoutingTable() throws Exception {
+    // Verify routing table works
+    Set<InstanceConfig> allOnlineNodes =
+        _routingTableProvider.getInstancesForResourceGroup(TEST_DB, "ONLINE");
+    Assert.assertEquals(allOnlineNodes.size(), TOTAL_NODE_NR);
+
+    List<InstanceConfig> onlinePartitions =
+        _routingTableProvider.getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE");
+    Assert.assertEquals(onlinePartitions.size(), INSTANCE_GROUP_NR * _replica);
+
+    Set<InstanceConfig> selectedNodes = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2);
+
+    List<InstanceConfig> selectedPartition = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+            Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedPartition.size(), _replica * 2);
+  }
+
+  @Test(dependsOnMethods = { "testRoutingTable" })
+  public void testEnableDisableClusters() throws InterruptedException {
+    // disable a resource
+    _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", false);
+
+    Thread.sleep(500);
+
+    Set<InstanceConfig> selectedNodes = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 1);
+
+    List<InstanceConfig> selectedPartition = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+            Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedPartition.size(), _replica * 1);
+
+    // enable a resource
+    _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", true);
+    Thread.sleep(500);
+
+    selectedNodes = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2);
+
+    selectedPartition = _routingTableProvider
+        .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE",
+            Arrays.asList("cluster_2", "cluster_3"));
+    Assert.assertEquals(selectedPartition.size(), _replica * 2);
+  }
+
+  public static class MockProcess {
+    private static final Logger logger = Logger.getLogger(DummyProcess.class);
+    // public static final String rootNamespace = "rootNamespace";
+
+    private final String _zkConnectString;
+    private final String _clusterName;
+    private final String _instanceName;
+    private final String _resourceName;
+    private final String _resourceTag;
+    // private StateMachineEngine genericStateMachineHandler;
+
+    private int _transDelayInMs = 0;
+    private final String _clusterMangerType;
+
+    public MockProcess(String zkConnectString, String clusterName, String resourceName,
+        String instanceName, String resourceTag,
+        String clusterMangerType, int delay) {
+      _zkConnectString = zkConnectString;
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+      _resourceTag = resourceTag;
+      _instanceName = instanceName;
+      _clusterMangerType = clusterMangerType;
+      _transDelayInMs = delay > 0 ? delay : 0;
+    }
+
+    static void sleep(long transDelay) {
+      try {
+        if (transDelay > 0) {
+          Thread.sleep(transDelay);
+        }
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    public HelixManager start() throws Exception {
+      HelixManager manager = null;
+      // zk cluster manager
+      if (_clusterMangerType.equalsIgnoreCase("zk")) {
+        manager =
+            HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+                InstanceType.PARTICIPANT, _zkConnectString);
+      } else {
+        throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
+      }
+
+      MockOnlineOfflineStateModelFactory stateModelFactory2 =
+          new MockOnlineOfflineStateModelFactory(_transDelayInMs, _resourceName, _resourceTag,
+              _instanceName);
+      // genericStateMachineHandler = new StateMachineEngine();
+      StateMachineEngine stateMach = manager.getStateMachineEngine();
+      stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+
+      manager.connect();
+      //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+      return manager;
+    }
+
+    public static class MockOnlineOfflineStateModelFactory extends
+        StateModelFactory<MockOnlineOfflineStateModel> {
+      int _delay;
+      String _instanceName;
+      String _resourceName;
+      String _resourceTag;
+
+      public MockOnlineOfflineStateModelFactory(int delay, String resourceName, String resourceTag,
+          String instanceName) {
+        _delay = delay;
+        _instanceName = instanceName;
+        _resourceName = resourceName;
+        _resourceTag = resourceTag;
+      }
+
+      @Override
+      public MockOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) {
+        MockOnlineOfflineStateModel model = new MockOnlineOfflineStateModel();
+        model.setDelay(_delay);
+        model.setInstanceName(_instanceName);
+        model.setResourceName(_resourceName);
+        model.setResourceTag(_resourceTag);
+        return model;
+      }
+    }
+
+    public static class MockOnlineOfflineStateModel extends StateModel {
+      int _transDelay = 0;
+      String _instanceName;
+      String _resourceName;
+      String _resourceTag;
+
+      public void setDelay(int delay) {
+        _transDelay = delay > 0 ? delay : 0;
+      }
+
+      public void setInstanceName(String instanceName) {_instanceName = instanceName;}
+
+      public void setResourceTag(String resourceTag) {
+        _resourceTag = resourceTag;
+      }
+
+      public void setResourceName(String resourceName) {
+        _resourceName = resourceName;
+      }
+
+      public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+        String db = message.getPartitionName();
+        String instanceName = context.getManager().getInstanceName();
+        MockProcess.sleep(_transDelay);
+
+        logger.info("MockStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName + ", db:"
+            + db);
+
+        logger.info(
+            "MockStateModel.onBecomeOnlineFromOffline(), resource " + message.getResourceName()
+                + ", partition"
+                + message.getPartitionName());
+
+        verifyMessage(message);
+      }
+
+      public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+        MockProcess.sleep(_transDelay);
+
+        logger.info(
+            "MockStateModel.onBecomeOfflineFromOnline(), resource " + message.getResourceName()
+                + ", partition"
+                + message.getPartitionName() + ", targetName: " + message.getTgtName());
+
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        verifyMessage(message);
+      }
+
+      public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+        MockProcess.sleep(_transDelay);
+
+        logger.info(
+            "MockStateModel.onBecomeDroppedFromOffline(), resource " + message.getResourceName()
+                + ", partition"
+                + message.getPartitionName());
+
+        verifyMessage(message);
+      }
+
+      private void verifyMessage(Message message) {
+        assert _instanceName.equals(message.getTgtName());
+        assert _resourceName.equals(message.getResourceGroupName());
+        assert _resourceTag.equals(message.getResourceTag());
+      }
+    }
+  }
+
+  public static class TestParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
+    private static Logger LOG = Logger.getLogger(TestParticipantManager.class);
+
+    private final CountDownLatch _startCountDown = new CountDownLatch(1);
+    private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+    private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
+
+    private String _instanceGroup;
+    private String _resourceName;
+
+    public TestParticipantManager(String zkAddr, String clusterName, String resourceName,
+        String instanceGroup, String instanceName) {
+      super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+      _instanceGroup = instanceGroup;
+      _resourceName = resourceName;
+    }
+
+    public void syncStop() {
+      _stopCountDown.countDown();
+      try {
+        _waitStopCompleteCountDown.await();
+      } catch (InterruptedException e) {
+        LOG.error("exception in syncStop participant-manager", e);
+      }
+    }
+
+    public void syncStart() {
+      try {
+        new Thread(this).start();
+        _startCountDown.await();
+      } catch (InterruptedException e) {
+        LOG.error("exception in syncStart participant-manager", e);
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        StateMachineEngine stateMach = getStateMachineEngine();
+        MockProcess.MockOnlineOfflineStateModelFactory
+            ofModelFactory =
+            new MockProcess.MockOnlineOfflineStateModelFactory(10, _resourceName, _instanceGroup,
+                getInstanceName());
+        stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+        connect();
+        _startCountDown.countDown();
+
+        _stopCountDown.await();
+      } catch (InterruptedException e) {
+        String msg =
+            "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
+                + " is interrupted";
+        LOG.info(msg);
+      } catch (Exception e) {
+        LOG.error("exception running participant-manager", e);
+      } finally {
+        _startCountDown.countDown();
+
+        disconnect();
+        _waitStopCompleteCountDown.countDown();
+      }
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return _zkclient;
+    }
+
+    @Override
+    public List<CallbackHandler> getHandlers() {
+      return _handlers;
+    }
+  }
+}