You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:15 UTC
[07/53] [abbrv] git commit: [HELIX-236] Create a hierarchical cluster
snapshot to replace ClusterDataCache
[HELIX-236] Create a hierarchical cluster snapshot to replace ClusterDataCache
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0b67257a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0b67257a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0b67257a
Branch: refs/heads/master
Commit: 0b67257ac579042a80a30c180d51d3ec2aa8860f
Parents: 24fd868
Author: zzhang <zz...@uci.edu>
Authored: Wed Sep 25 15:42:26 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 1 +
.../helix/tools/ClusterStateVerifier.java | 14 +++-----
.../helix/integration/TestAutoRebalance.java | 29 ++++++++++------
.../TestAutoRebalancePartitionLimit.java | 35 ++++++++++----------
.../helix/integration/TestBatchMessage.java | 11 +++---
.../TestCustomizedIdealStateRebalancer.java | 33 +++++++++---------
6 files changed, 65 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index f50b95c..2c4d8e1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -42,6 +42,7 @@ import org.apache.log4j.Logger;
* Reads the data from the cluster using data accessor. This output ClusterData which
* provides useful methods to search/lookup properties
*/
+@Deprecated
public class ClusterDataCache {
Map<String, LiveInstance> _liveInstanceMap;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index d4c1691..3370c99 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -227,21 +227,15 @@ public class ClusterStateVerifier {
Map<String, Map<String, String>> errStates, String clusterName) {
try {
Builder keyBuilder = accessor.keyBuilder();
- // read cluster once and do verification
- // TODO: stop using ClusterDataCache
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
-
- Map<String, IdealState> idealStates = cache.getIdealStates();
- if (idealStates == null) // || idealStates.isEmpty())
- {
+
+ Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
+ if (idealStates == null) {
// ideal state is null because ideal state is dropped
idealStates = Collections.emptyMap();
}
Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
- if (extViews == null) // || extViews.isEmpty())
- {
+ if (extViews == null) {
extViews = Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index e837626..0731475 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -40,6 +40,9 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
@@ -264,22 +267,26 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
}
int numberOfPartitions = idealState.getRecord().getListFields().size();
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
- State masterValue =
- cache
- .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify())
- .getStatesPriorityList().get(0);
- int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
- String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
+ String stateModelDefName = idealState.getStateModelDefId().stringify();
+ StateModelDefinition stateModelDef =
+ accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+ State masterValue = stateModelDef.getStatesPriorityList().get(0);
+ int replicas = Integer.parseInt(idealState.getReplicas());
+
+ String instanceGroupTag = idealState.getInstanceGroupTag();
+
int instances = 0;
- for (String liveInstanceName : cache.getLiveInstances().keySet()) {
- if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+ Map<String, LiveInstance> liveInstanceMap =
+ accessor.getChildValuesMap(keyBuilder.liveInstances());
+ Map<String, InstanceConfig> instanceConfigMap =
+ accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ for (String liveInstanceName : liveInstanceMap.keySet()) {
+ if (instanceConfigMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
instances++;
}
}
if (instances == 0) {
- instances = cache.getLiveInstances().size();
+ instances = liveInstanceMap.size();
}
ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
if (ev == null) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 2446b67..19535ff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -31,13 +31,15 @@ import org.apache.helix.TestHelper.StartCMResult;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
@@ -50,6 +52,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
.getName());
+ @Override
@BeforeClass
public void beforeClass() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
@@ -122,7 +125,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
// kill 1 node
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
_startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.currentThread().sleep(1000);
+ Thread.sleep(1000);
_startCMResultMap.get(instanceName)._thread.interrupt();
// verifyBalanceExternalView();
@@ -137,7 +140,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
_startCMResultMap.get(instanceName)._manager.disconnect();
- Thread.currentThread().sleep(1000);
+ Thread.sleep(1000);
_startCMResultMap.get(instanceName)._thread.interrupt();
// verifyBalanceExternalView();
@@ -223,22 +226,20 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
@Override
public boolean verify() {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
Builder keyBuilder = accessor.keyBuilder();
- int numberOfPartitions =
- accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
- .size();
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
- State masterValue =
- cache
- .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify())
- .getStatesPriorityList().get(0);
- int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+ IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
+ int numberOfPartitions = idealState.getRecord().getListFields().size();
+ String stateModelDefName = idealState.getStateModelDefId().stringify();
+ StateModelDefinition stateModelDef =
+ accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+ State masterValue = stateModelDef.getStatesPriorityList().get(0);
+ Map<String, LiveInstance> liveInstanceMap =
+ accessor.getChildValuesMap(keyBuilder.liveInstances());
+ int replicas = Integer.parseInt(idealState.getReplicas());
return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName))
- .getRecord(), numberOfPartitions, masterValue.toString(), replicas, cache
- .getLiveInstances().size(), cache.getIdealState(_resourceName)
- .getMaxPartitionsPerInstance());
+ .getRecord(), numberOfPartitions, masterValue.toString(), replicas,
+ liveInstanceMap.size(), idealState.getMaxPartitionsPerInstance());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index 833f85e..ede4e12 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -226,14 +226,17 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
idealState.setBatchMessageMode(true);
accessor.setProperty(keyBuilder.idealState("TestDB0"), idealState);
+ final String hostToFail = "localhost_12921";
+ final String partitionToFail = "TestDB0_4";
+
TestHelper
.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
for (int i = 0; i < n; i++) {
String instanceName = "localhost_" + (12918 + i);
- if (i == 1) {
+ if (instanceName.equals(hostToFail)) {
Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
- errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf(partitionToFail));
participants[i] =
new MockParticipant(clusterName, instanceName, ZK_ADDR,
new ErrTransition(errPartitions));
@@ -245,14 +248,14 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>();
errStates.put("TestDB0", new HashMap<String, String>());
- errStates.get("TestDB0").put("TestDB0_4", "localhost_12919");
+ errStates.get("TestDB0").put(partitionToFail, hostToFail);
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName, errStates));
Assert.assertTrue(result);
Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
- errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12919"));
+ errorStateMap.put(partitionToFail, TestHelper.setOf(hostToFail));
// verify "TestDB0_4", "localhost_12919" is in ERROR state
TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 90c53d6..a5001ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -35,7 +35,6 @@ import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
import org.apache.helix.controller.rebalancer.context.Rebalancer;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -44,6 +43,8 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -142,26 +143,26 @@ public class TestCustomizedIdealStateRebalancer extends
HelixDataAccessor accessor =
new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
Builder keyBuilder = accessor.keyBuilder();
- int numberOfPartitions =
- accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
- .size();
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
- State masterValue =
- cache
- .getStateModelDef(
- cache.getIdealState(_resourceName).getStateModelDefId().stringify())
- .getStatesPriorityList().get(0);
- int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
- String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
+ int numberOfPartitions = idealState.getRecord().getListFields().size();
+ String stateModelDefName = idealState.getStateModelDefId().stringify();
+ StateModelDefinition stateModelDef =
+ accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+ State masterValue = stateModelDef.getStatesPriorityList().get(0);
+ int replicas = Integer.parseInt(idealState.getReplicas());
+ String instanceGroupTag = idealState.getInstanceGroupTag();
int instances = 0;
- for (String liveInstanceName : cache.getLiveInstances().keySet()) {
- if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+ Map<String, LiveInstance> liveInstanceMap =
+ accessor.getChildValuesMap(keyBuilder.liveInstances());
+ Map<String, InstanceConfig> instanceCfgMap =
+ accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ for (String liveInstanceName : liveInstanceMap.keySet()) {
+ if (instanceCfgMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
instances++;
}
}
if (instances == 0) {
- instances = cache.getLiveInstances().size();
+ instances = liveInstanceMap.size();
}
ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
return verifyBalanceExternalView(externalView.getRecord(), numberOfPartitions,