You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/02/20 00:55:43 UTC

git commit: [Helix-49] add support for specifying per-node partition limit for helix aut-rebalance mode

Updated Branches:
  refs/heads/master e46f0a1a7 -> 1609d5cb3


[Helix-49] add support for specifying per-node partition limit for helix
aut-rebalance mode

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/1609d5cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/1609d5cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/1609d5cb

Branch: refs/heads/master
Commit: 1609d5cb3451c0fe24afc49fbae577e80fd6d2be
Parents: e46f0a1
Author: slu2011 <lu...@gmail.com>
Authored: Tue Feb 19 15:55:01 2013 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Tue Feb 19 15:55:01 2013 -0800

----------------------------------------------------------------------
 .../helix/webapp/resources/JsonParameters.java     |    3 +
 .../webapp/resources/ResourceGroupsResource.java   |   31 ++-
 .../src/main/java/org/apache/helix/HelixAdmin.java |   21 ++-
 .../stages/BestPossibleStateCalcStage.java         |   11 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   18 +-
 .../java/org/apache/helix/model/IdealState.java    |   25 ++-
 .../java/org/apache/helix/tools/ClusterSetup.java  |   40 +++-
 .../TestAutoRebalancePartitionLimit.java           |  251 +++++++++++++++
 .../helix/integration/TestSchedulerMessage.java    |    4 +-
 9 files changed, 391 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 801055d..fc77dcf 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -59,6 +59,9 @@ public class JsonParameters
   public static final String             RESOURCE_GROUP_NAME = "resourceGroupName";
   public static final String             STATE_MODEL_DEF_REF = "stateModelDefRef";
   public static final String             IDEAL_STATE_MODE    = "mode";
+  public static final String             MAX_PARTITIONS_PER_NODE = "maxPartitionsPerNode";
+  public static final String             BUCKET_SIZE         = "bucketSize";
+  
 
   // zk commands
   public static final String             ZK_DELETE_CHILDREN  = "zkDeleteChildren";

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
index 1d16716..dbbf0a1 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
@@ -145,6 +145,32 @@ public class ResourceGroupsResource extends Resource
         {
           mode = jsonParameters.getParameter(JsonParameters.IDEAL_STATE_MODE);
         }
+        
+        int bucketSize = 0;
+        if (jsonParameters.getParameter(JsonParameters.BUCKET_SIZE) != null)
+        {
+          try
+          {
+            bucketSize = Integer.parseInt(jsonParameters.getParameter(JsonParameters.BUCKET_SIZE));
+          }
+          catch(Exception e)
+          {
+            
+          }
+        }
+        
+        int maxPartitionsPerNode = -1;
+        if (jsonParameters.getParameter(JsonParameters.MAX_PARTITIONS_PER_NODE) != null)
+        {
+          try
+          {
+            maxPartitionsPerNode = Integer.parseInt(jsonParameters.getParameter(JsonParameters.MAX_PARTITIONS_PER_NODE));
+          }
+          catch(Exception e)
+          {
+            
+          }
+        }
 
         ZkClient zkClient =
             (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
@@ -154,7 +180,10 @@ public class ResourceGroupsResource extends Resource
                                        entityName,
                                        partitions,
                                        stateModelDefRef,
-                                       mode);
+                                       mode,
+                                       bucketSize,
+                                       maxPartitionsPerNode
+                                       );
       }
       else
       {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 7d3ecfa..86eb57d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -134,7 +134,26 @@ public interface HelixAdmin
                    String stateModelRef,
                    String idealStateMode,
                    int bucketSize);
-
+  
+  /**
+   * Add a resource to a cluster, using a bucket size > 1
+   * 
+   * @param clusterName
+   * @param resourceName
+   * @param numResources
+   * @param stateModelRef
+   * @param idealStateMode
+   * @param bucketSize
+   * @param maxPartitionsPerInstance
+   */
+  void addResource(String clusterName,
+                   String resourceName,
+                   int numResources,
+                   String stateModelRef,
+                   String idealStateMode,
+                   int bucketSize,
+                   int maxPartitionsPerInstance);
+  
   /**
    * Add an instance to a cluster
    * 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index e802c04..84092e4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -244,7 +244,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     }
     List<String> orphanedPartitionsList = new ArrayList<String>();
     orphanedPartitionsList.addAll(orphanedPartitions);
-    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
+    int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
+    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
     idealState.getRecord()
               .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
                                                                    replicas));
@@ -262,7 +263,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
    * @return
    */
   private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
-                                      List<String> orphanPartitions)
+                                      List<String> orphanPartitions, int maxPartitionsPerInstance)
   {
     int totalPartitions = 0;
     String[] instanceNames = new String[masterAssignmentMap.size()];
@@ -300,6 +301,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     {
       int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
       leave--;
+      if(targetPartitionNo > maxPartitionsPerInstance)
+      {
+        targetPartitionNo = maxPartitionsPerInstance;
+      }
       while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
       {
         int lastElementIndex = orphanPartitions.size() - 1;
@@ -310,7 +315,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
     }
     if (orphanPartitions.size() > 0)
     {
-      logger.error("orphanPartitions still contains elements");
+      logger.warn("orphanPartitions still contains elements");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d618cd3..be96dbe 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -688,6 +688,16 @@ public class ZKHelixAdmin implements HelixAdmin
                           String idealStateMode,
                           int bucketSize)
   {
+    addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode,
+        bucketSize, -1);
+
+  }
+  
+  @Override
+  public void addResource(String clusterName, String resourceName,
+      int partitions, String stateModelRef, String idealStateMode,
+      int bucketSize, int maxPartitionsPerInstance)
+  {
     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
     {
       throw new HelixException("cluster " + clusterName + " is not setup yet");
@@ -708,13 +718,15 @@ public class ZKHelixAdmin implements HelixAdmin
     idealState.setIdealStateMode(mode.toString());
     idealState.setReplicas("" + 0);
     idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-
+    if(maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE)
+    {
+      idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
+    }
     if (bucketSize > 0)
     {
       idealState.setBucketSize(bucketSize);
     }
     addResource(clusterName, resourceName, idealState);
-
   }
 
   @Override
@@ -1379,5 +1391,7 @@ public class ZKHelixAdmin implements HelixAdmin
     IdealState newIdealState = new IdealState(newIdealStateRecord);
     setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
   }
+
+  
  
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index f1121fe..871545f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -39,7 +39,7 @@ public class IdealState extends HelixProperty
 {
   public enum IdealStateProperty
   {
-    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE, REBALANCE_TIMER_PERIOD
+    NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE, REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -72,6 +72,29 @@ public class IdealState extends HelixProperty
     _record
         .setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), mode);
   }
+  
+  public int getMaxPartitionsPerInstance()
+  {
+    try
+    {
+      String strVal =  _record
+          .getSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString());
+      if(strVal != null)
+      {
+        return Integer.parseInt(strVal);
+      }
+    } 
+    catch (Exception e)
+    {
+    }
+    return Integer.MAX_VALUE;
+  }
+  
+  public void setMaxPartitionsPerInstance(int max)
+  {
+    _record
+    .setSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString(), Integer.toString(max));
+  }
 
   public IdealStateModeProperty getIdealStateMode()
   {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index bde82f6..29ed82f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -93,6 +93,8 @@ public class ClusterSetup
   public static final String mode = "mode";
   public static final String bucketSize = "bucketSize";
   public static final String resourceKeyPrefix = "key";
+  public static final String maxPartitionsPerNode = "maxPartitionsPerNode";
+  
   public static final String addResourceProperty = "addResourceProperty";
   public static final String removeResourceProperty = "removeResourceProperty";
 
@@ -431,6 +433,23 @@ public class ClusterSetup
                        idealStateMode,
                        bucketSize);
   }
+  
+  public void addResourceToCluster(String clusterName,
+      String resourceName,
+      int numResources,
+      String stateModelRef,
+      String idealStateMode,
+      int bucketSize,
+      int maxPartitionsPerInstance)
+  {
+    _admin.addResource(clusterName,
+      resourceName,
+      numResources,
+      stateModelRef,
+      idealStateMode,
+      bucketSize,
+      maxPartitionsPerInstance);
+  }
 
   public void dropResourceFromCluster(String clusterName, String resourceName)
   {
@@ -443,7 +462,7 @@ public class ClusterSetup
     rebalanceStorageCluster(clusterName, resourceName, replica, resourceName);
   }
 
-  public void reblanceResource(String clusterName, String resourceName, int replica)
+  public void rebalanceResource(String clusterName, String resourceName, int replica)
   {
     rebalanceStorageCluster(clusterName, resourceName, replica, resourceName); 
   }
@@ -738,6 +757,14 @@ public class ClusterSetup
     resourceBucketSizeOption.setArgs(1);
     resourceBucketSizeOption.setRequired(false);
     resourceBucketSizeOption.setArgName("Size of a bucket for a resource");
+    
+    Option maxPartitionsPerNodeOption =
+        OptionBuilder.withLongOpt(maxPartitionsPerNode)
+                     .withDescription("Specify max partitions per node, used with addResourceGroup command")
+                     .create();
+    maxPartitionsPerNodeOption.setArgs(1);
+    maxPartitionsPerNodeOption.setRequired(false);
+    maxPartitionsPerNodeOption.setArgName("Max partitions per node for a resource");
 
     Option resourceKeyOption =
         OptionBuilder.withLongOpt(resourceKeyPrefix)
@@ -952,6 +979,7 @@ public class ClusterSetup
     group.addOption(addResourceOption);
     group.addOption(resourceModeOption);
     group.addOption(resourceBucketSizeOption);
+    group.addOption(maxPartitionsPerNodeOption);
     group.addOption(expandResourceOption);
     group.addOption(expandClusterOption);
     group.addOption(resourceKeyOption);
@@ -1083,13 +1111,19 @@ public class ClusterSetup
       {
         bucketSizeVal = Integer.parseInt(cmd.getOptionValues(bucketSize)[0]);
       }
-
+      
+      int maxPartitionsPerNodeVal = -1;
+      if (cmd.hasOption(maxPartitionsPerNode))
+      {
+        maxPartitionsPerNodeVal = Integer.parseInt(cmd.getOptionValues(maxPartitionsPerNode)[0]);
+      }
       setupTool.addResourceToCluster(clusterName,
                                      resourceName,
                                      partitions,
                                      stateModelRef,
                                      modeValue,
-                                     bucketSizeVal);
+                                     bucketSizeVal,
+                                     maxPartitionsPerNodeVal);
       return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
new file mode 100644
index 0000000..4980c59
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -0,0 +1,251 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+  
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    String namespace = "/" + CLUSTER_NAME;
+    if (_zkClient.exists(namespace))
+    {
+      _zkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 100, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"", 0, 25);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 1);
+    
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    StartCMResult startResult =
+        TestHelper.startController(CLUSTER_NAME,
+                                   controllerName,
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    _startCMResultMap.put(controllerName, startResult);
+
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      if (_startCMResultMap.get(instanceName) != null)
+      {
+        LOG.error("fail to start particpant:" + instanceName
+            + "(participant with same name already exists)");
+      }
+      else
+      {
+        startResult =
+            TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+        _startCMResultMap.put(instanceName, startResult);
+        Thread.sleep(1000);
+        boolean result =
+            ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                                  CLUSTER_NAME, TEST_DB));
+        Assert.assertTrue(result);
+        ExternalView ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+        if(i < 3)
+        {
+          Assert.assertEquals(ev.getPartitionSet().size(), 25*(i+1));
+        }
+        else
+        {
+          Assert.assertEquals(ev.getPartitionSet().size(), 100);
+        }
+      }
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+
+    Assert.assertTrue(result);
+  }
+  
+  @Test()
+  public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    // kill 1 node
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    _startCMResultMap.get(instanceName)._manager.disconnect();
+    Thread.currentThread().sleep(1000);
+    _startCMResultMap.get(instanceName)._thread.interrupt();
+    
+    //verifyBalanceExternalView();
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+    Assert.assertTrue(result);
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ExternalView ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+    Assert.assertEquals(ev.getPartitionSet().size(), 100);
+    
+    instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
+    _startCMResultMap.get(instanceName)._manager.disconnect();
+    Thread.currentThread().sleep(1000);
+    _startCMResultMap.get(instanceName)._thread.interrupt();
+    
+    //verifyBalanceExternalView();
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+    Assert.assertTrue(result);
+    ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+    Assert.assertEquals(ev.getPartitionSet().size(), 75);
+    
+    // add 2 nodes
+    for (int i = 0; i < 2; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (1000 + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      
+      StartCMResult resultx =
+          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+      _startCMResultMap.put(storageNodeName, resultx);
+    }
+    Thread.sleep(1000);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+                                                                              CLUSTER_NAME, TEST_DB));
+    Assert.assertTrue(result);
+  }
+  
+  static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances, int maxPerInstance)
+  {
+    Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
+    for(String partitionName : externalView.getMapFields().keySet())
+    {
+      Map<String, String> assignmentMap = externalView.getMapField(partitionName);
+      //Assert.assertTrue(assignmentMap.size() >= replica);
+      for(String instance : assignmentMap.keySet())
+      {
+        if(assignmentMap.get(instance).equals(masterState))
+        {
+          if(!masterPartitionsCountMap.containsKey(instance))
+          {
+            masterPartitionsCountMap.put(instance, 0);
+          }
+          masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
+        }
+      }
+    }
+    
+    int perInstancePartition = partitionCount / instances;
+    
+    int totalCount = 0;
+    for(String instanceName : masterPartitionsCountMap.keySet())
+    {
+      int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+      totalCount += instancePartitionCount;
+      if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1  || instancePartitionCount == maxPerInstance))
+      {
+        return false;
+      }
+      if(instancePartitionCount == maxPerInstance)
+      {
+        continue;
+      }
+      if(instancePartitionCount == perInstancePartition +1)
+      {
+        if(partitionCount % instances == 0)
+        {
+          return false;
+        }
+      }
+    }
+    if(totalCount == maxPerInstance * instances)
+    {
+      return true;
+    }
+    if(partitionCount != totalCount )
+    {
+      return false;
+    }
+    return true;
+    
+  }
+  
+  public static class ExternalViewBalancedVerifier implements ZkVerifier
+  {
+    ZkClient _client;
+    String _clusterName;
+    String _resourceName;
+    
+    public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
+    {
+      _client = client;
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+    }
+    @Override
+    public boolean verify()
+    {
+      HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
+      Builder keyBuilder = accessor.keyBuilder();
+      int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
+      ClusterDataCache cache = new ClusterDataCache();
+      cache.refresh(accessor);
+      String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
+      int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+      return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(),cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
+    }
+
+    @Override
+    public ZkClient getZkClient()
+    {
+      return _client;
+    }
+
+    @Override
+    public String getClusterName()
+    {
+      return _clusterName;
+    }
+    }
+  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 5ee4f85..aaa7959 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -334,7 +334,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
       }
     }
-    
+    Thread.sleep(3000);
     ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
     dumpTask.run();
     
@@ -922,7 +922,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     
     for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
     {
-      for(int j = 0; i< 10; j++)
+      for(int j = 0; j< 10; j++)
       {
         Thread.sleep(300);
         if(factory._messageCount == 5*(i+1)) break;