You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/02/20 00:55:43 UTC
git commit: [Helix-49] add support for specifying per-node partition
limit for helix aut-rebalance mode
Updated Branches:
refs/heads/master e46f0a1a7 -> 1609d5cb3
[Helix-49] add support for specifying per-node partition limit for helix
aut-rebalance mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/1609d5cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/1609d5cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/1609d5cb
Branch: refs/heads/master
Commit: 1609d5cb3451c0fe24afc49fbae577e80fd6d2be
Parents: e46f0a1
Author: slu2011 <lu...@gmail.com>
Authored: Tue Feb 19 15:55:01 2013 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Tue Feb 19 15:55:01 2013 -0800
----------------------------------------------------------------------
.../helix/webapp/resources/JsonParameters.java | 3 +
.../webapp/resources/ResourceGroupsResource.java | 31 ++-
.../src/main/java/org/apache/helix/HelixAdmin.java | 21 ++-
.../stages/BestPossibleStateCalcStage.java | 11 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 18 +-
.../java/org/apache/helix/model/IdealState.java | 25 ++-
.../java/org/apache/helix/tools/ClusterSetup.java | 40 +++-
.../TestAutoRebalancePartitionLimit.java | 251 +++++++++++++++
.../helix/integration/TestSchedulerMessage.java | 4 +-
9 files changed, 391 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 801055d..fc77dcf 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -59,6 +59,9 @@ public class JsonParameters
public static final String RESOURCE_GROUP_NAME = "resourceGroupName";
public static final String STATE_MODEL_DEF_REF = "stateModelDefRef";
public static final String IDEAL_STATE_MODE = "mode";
+ public static final String MAX_PARTITIONS_PER_NODE = "maxPartitionsPerNode";
+ public static final String BUCKET_SIZE = "bucketSize";
+
// zk commands
public static final String ZK_DELETE_CHILDREN = "zkDeleteChildren";
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
index 1d16716..dbbf0a1 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java
@@ -145,6 +145,32 @@ public class ResourceGroupsResource extends Resource
{
mode = jsonParameters.getParameter(JsonParameters.IDEAL_STATE_MODE);
}
+
+ int bucketSize = 0;
+ if (jsonParameters.getParameter(JsonParameters.BUCKET_SIZE) != null)
+ {
+ try
+ {
+ bucketSize = Integer.parseInt(jsonParameters.getParameter(JsonParameters.BUCKET_SIZE));
+ }
+ catch(Exception e)
+ {
+
+ }
+ }
+
+ int maxPartitionsPerNode = -1;
+ if (jsonParameters.getParameter(JsonParameters.MAX_PARTITIONS_PER_NODE) != null)
+ {
+ try
+ {
+ maxPartitionsPerNode = Integer.parseInt(jsonParameters.getParameter(JsonParameters.MAX_PARTITIONS_PER_NODE));
+ }
+ catch(Exception e)
+ {
+
+ }
+ }
ZkClient zkClient =
(ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
@@ -154,7 +180,10 @@ public class ResourceGroupsResource extends Resource
entityName,
partitions,
stateModelDefRef,
- mode);
+ mode,
+ bucketSize,
+ maxPartitionsPerNode
+ );
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 7d3ecfa..86eb57d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -134,7 +134,26 @@ public interface HelixAdmin
String stateModelRef,
String idealStateMode,
int bucketSize);
-
+
+ /**
+ * Add a resource to a cluster, using a bucket size > 1
+ *
+ * @param clusterName
+ * @param resourceName
+ * @param numResources
+ * @param stateModelRef
+ * @param idealStateMode
+ * @param bucketSize
+ * @param maxPartitionsPerInstance
+ */
+ void addResource(String clusterName,
+ String resourceName,
+ int numResources,
+ String stateModelRef,
+ String idealStateMode,
+ int bucketSize,
+ int maxPartitionsPerInstance);
+
/**
* Add an instance to a cluster
*
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index e802c04..84092e4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -244,7 +244,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
}
List<String> orphanedPartitionsList = new ArrayList<String>();
orphanedPartitionsList.addAll(orphanedPartitions);
- normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
+ int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
+ normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
idealState.getRecord()
.setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
replicas));
@@ -262,7 +263,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
* @return
*/
private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
- List<String> orphanPartitions)
+ List<String> orphanPartitions, int maxPartitionsPerInstance)
{
int totalPartitions = 0;
String[] instanceNames = new String[masterAssignmentMap.size()];
@@ -300,6 +301,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
{
int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
leave--;
+ if(targetPartitionNo > maxPartitionsPerInstance)
+ {
+ targetPartitionNo = maxPartitionsPerInstance;
+ }
while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
{
int lastElementIndex = orphanPartitions.size() - 1;
@@ -310,7 +315,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage
}
if (orphanPartitions.size() > 0)
{
- logger.error("orphanPartitions still contains elements");
+ logger.warn("orphanPartitions still contains elements");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d618cd3..be96dbe 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -688,6 +688,16 @@ public class ZKHelixAdmin implements HelixAdmin
String idealStateMode,
int bucketSize)
{
+ addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode,
+ bucketSize, -1);
+
+ }
+
+ @Override
+ public void addResource(String clusterName, String resourceName,
+ int partitions, String stateModelRef, String idealStateMode,
+ int bucketSize, int maxPartitionsPerInstance)
+ {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
{
throw new HelixException("cluster " + clusterName + " is not setup yet");
@@ -708,13 +718,15 @@ public class ZKHelixAdmin implements HelixAdmin
idealState.setIdealStateMode(mode.toString());
idealState.setReplicas("" + 0);
idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-
+ if(maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE)
+ {
+ idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
+ }
if (bucketSize > 0)
{
idealState.setBucketSize(bucketSize);
}
addResource(clusterName, resourceName, idealState);
-
}
@Override
@@ -1379,5 +1391,7 @@ public class ZKHelixAdmin implements HelixAdmin
IdealState newIdealState = new IdealState(newIdealStateRecord);
setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index f1121fe..871545f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -39,7 +39,7 @@ public class IdealState extends HelixProperty
{
public enum IdealStateProperty
{
- NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE, REBALANCE_TIMER_PERIOD
+ NUM_PARTITIONS, STATE_MODEL_DEF_REF, STATE_MODEL_FACTORY_NAME, REPLICAS, IDEAL_STATE_MODE, REBALANCE_TIMER_PERIOD, MAX_PARTITONS_PER_INSTANCE
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -72,6 +72,29 @@ public class IdealState extends HelixProperty
_record
.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), mode);
}
+
+ public int getMaxPartitionsPerInstance()
+ {
+ try
+ {
+ String strVal = _record
+ .getSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString());
+ if(strVal != null)
+ {
+ return Integer.parseInt(strVal);
+ }
+ }
+ catch (Exception e)
+ {
+ }
+ return Integer.MAX_VALUE;
+ }
+
+ public void setMaxPartitionsPerInstance(int max)
+ {
+ _record
+ .setSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString(), Integer.toString(max));
+ }
public IdealStateModeProperty getIdealStateMode()
{
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index bde82f6..29ed82f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -93,6 +93,8 @@ public class ClusterSetup
public static final String mode = "mode";
public static final String bucketSize = "bucketSize";
public static final String resourceKeyPrefix = "key";
+ public static final String maxPartitionsPerNode = "maxPartitionsPerNode";
+
public static final String addResourceProperty = "addResourceProperty";
public static final String removeResourceProperty = "removeResourceProperty";
@@ -431,6 +433,23 @@ public class ClusterSetup
idealStateMode,
bucketSize);
}
+
+ public void addResourceToCluster(String clusterName,
+ String resourceName,
+ int numResources,
+ String stateModelRef,
+ String idealStateMode,
+ int bucketSize,
+ int maxPartitionsPerInstance)
+ {
+ _admin.addResource(clusterName,
+ resourceName,
+ numResources,
+ stateModelRef,
+ idealStateMode,
+ bucketSize,
+ maxPartitionsPerInstance);
+ }
public void dropResourceFromCluster(String clusterName, String resourceName)
{
@@ -443,7 +462,7 @@ public class ClusterSetup
rebalanceStorageCluster(clusterName, resourceName, replica, resourceName);
}
- public void reblanceResource(String clusterName, String resourceName, int replica)
+ public void rebalanceResource(String clusterName, String resourceName, int replica)
{
rebalanceStorageCluster(clusterName, resourceName, replica, resourceName);
}
@@ -738,6 +757,14 @@ public class ClusterSetup
resourceBucketSizeOption.setArgs(1);
resourceBucketSizeOption.setRequired(false);
resourceBucketSizeOption.setArgName("Size of a bucket for a resource");
+
+ Option maxPartitionsPerNodeOption =
+ OptionBuilder.withLongOpt(maxPartitionsPerNode)
+ .withDescription("Specify max partitions per node, used with addResourceGroup command")
+ .create();
+ maxPartitionsPerNodeOption.setArgs(1);
+ maxPartitionsPerNodeOption.setRequired(false);
+ maxPartitionsPerNodeOption.setArgName("Max partitions per node for a resource");
Option resourceKeyOption =
OptionBuilder.withLongOpt(resourceKeyPrefix)
@@ -952,6 +979,7 @@ public class ClusterSetup
group.addOption(addResourceOption);
group.addOption(resourceModeOption);
group.addOption(resourceBucketSizeOption);
+ group.addOption(maxPartitionsPerNodeOption);
group.addOption(expandResourceOption);
group.addOption(expandClusterOption);
group.addOption(resourceKeyOption);
@@ -1083,13 +1111,19 @@ public class ClusterSetup
{
bucketSizeVal = Integer.parseInt(cmd.getOptionValues(bucketSize)[0]);
}
-
+
+ int maxPartitionsPerNodeVal = -1;
+ if (cmd.hasOption(maxPartitionsPerNode))
+ {
+ maxPartitionsPerNodeVal = Integer.parseInt(cmd.getOptionValues(maxPartitionsPerNode)[0]);
+ }
setupTool.addResourceToCluster(clusterName,
resourceName,
partitions,
stateModelRef,
modeValue,
- bucketSizeVal);
+ bucketSizeVal,
+ maxPartitionsPerNodeVal);
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
new file mode 100644
index 0000000..4980c59
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -0,0 +1,251 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START " + CLASS_NAME + " at "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ String namespace = "/" + CLUSTER_NAME;
+ if (_zkClient.exists(namespace))
+ {
+ _zkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+ // setup storage cluster
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 100, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"", 0, 25);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 1);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ // start dummy participants
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ if (_startCMResultMap.get(instanceName) != null)
+ {
+ LOG.error("fail to start particpant:" + instanceName
+ + "(participant with same name already exists)");
+ }
+ else
+ {
+ startResult =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _startCMResultMap.put(instanceName, startResult);
+ Thread.sleep(1000);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ ExternalView ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ if(i < 3)
+ {
+ Assert.assertEquals(ev.getPartitionSet().size(), 25*(i+1));
+ }
+ else
+ {
+ Assert.assertEquals(ev.getPartitionSet().size(), 100);
+ }
+ }
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+
+ Assert.assertTrue(result);
+ }
+
+ @Test()
+ public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception
+ {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ // kill 1 node
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ _startCMResultMap.get(instanceName)._manager.disconnect();
+ Thread.currentThread().sleep(1000);
+ _startCMResultMap.get(instanceName)._thread.interrupt();
+
+ //verifyBalanceExternalView();
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ExternalView ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ Assert.assertEquals(ev.getPartitionSet().size(), 100);
+
+ instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
+ _startCMResultMap.get(instanceName)._manager.disconnect();
+ Thread.currentThread().sleep(1000);
+ _startCMResultMap.get(instanceName)._thread.interrupt();
+
+ //verifyBalanceExternalView();
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+ Assert.assertEquals(ev.getPartitionSet().size(), 75);
+
+ // add 2 nodes
+ for (int i = 0; i < 2; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (1000 + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ StartCMResult resultx =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+ _startCMResultMap.put(storageNodeName, resultx);
+ }
+ Thread.sleep(1000);
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ }
+
+ static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances, int maxPerInstance)
+ {
+ Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
+ for(String partitionName : externalView.getMapFields().keySet())
+ {
+ Map<String, String> assignmentMap = externalView.getMapField(partitionName);
+ //Assert.assertTrue(assignmentMap.size() >= replica);
+ for(String instance : assignmentMap.keySet())
+ {
+ if(assignmentMap.get(instance).equals(masterState))
+ {
+ if(!masterPartitionsCountMap.containsKey(instance))
+ {
+ masterPartitionsCountMap.put(instance, 0);
+ }
+ masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
+ }
+ }
+ }
+
+ int perInstancePartition = partitionCount / instances;
+
+ int totalCount = 0;
+ for(String instanceName : masterPartitionsCountMap.keySet())
+ {
+ int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+ totalCount += instancePartitionCount;
+ if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1 || instancePartitionCount == maxPerInstance))
+ {
+ return false;
+ }
+ if(instancePartitionCount == maxPerInstance)
+ {
+ continue;
+ }
+ if(instancePartitionCount == perInstancePartition +1)
+ {
+ if(partitionCount % instances == 0)
+ {
+ return false;
+ }
+ }
+ }
+ if(totalCount == maxPerInstance * instances)
+ {
+ return true;
+ }
+ if(partitionCount != totalCount )
+ {
+ return false;
+ }
+ return true;
+
+ }
+
+ public static class ExternalViewBalancedVerifier implements ZkVerifier
+ {
+ ZkClient _client;
+ String _clusterName;
+ String _resourceName;
+
+ public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
+ {
+ _client = client;
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ }
+ @Override
+ public boolean verify()
+ {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
+ Builder keyBuilder = accessor.keyBuilder();
+ int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(accessor);
+ String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
+ int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+ return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(),cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
+ }
+
+ @Override
+ public ZkClient getZkClient()
+ {
+ return _client;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/1609d5cb/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 5ee4f85..aaa7959 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -334,7 +334,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
}
}
-
+ Thread.sleep(3000);
ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
dumpTask.run();
@@ -922,7 +922,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
{
- for(int j = 0; i< 10; j++)
+ for(int j = 0; j< 10; j++)
{
Thread.sleep(300);
if(factory._messageCount == 5*(i+1)) break;