You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:15 UTC

[07/53] [abbrv] git commit: [HELIX-236] Create a hierarchical cluster snapshot to replace ClusterDataCache

[HELIX-236] Create a hierarchical cluster snapshot to replace ClusterDataCache


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0b67257a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0b67257a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0b67257a

Branch: refs/heads/master
Commit: 0b67257ac579042a80a30c180d51d3ec2aa8860f
Parents: 24fd868
Author: zzhang <zz...@uci.edu>
Authored: Wed Sep 25 15:42:26 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     |  1 +
 .../helix/tools/ClusterStateVerifier.java       | 14 +++-----
 .../helix/integration/TestAutoRebalance.java    | 29 ++++++++++------
 .../TestAutoRebalancePartitionLimit.java        | 35 ++++++++++----------
 .../helix/integration/TestBatchMessage.java     | 11 +++---
 .../TestCustomizedIdealStateRebalancer.java     | 33 +++++++++---------
 6 files changed, 65 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index f50b95c..2c4d8e1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -42,6 +42,7 @@ import org.apache.log4j.Logger;
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
+@Deprecated
 public class ClusterDataCache {
 
   Map<String, LiveInstance> _liveInstanceMap;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index d4c1691..3370c99 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -227,21 +227,15 @@ public class ClusterStateVerifier {
       Map<String, Map<String, String>> errStates, String clusterName) {
     try {
       Builder keyBuilder = accessor.keyBuilder();
-      // read cluster once and do verification
-      // TODO: stop using ClusterDataCache
-      ClusterDataCache cache = new ClusterDataCache();
-      cache.refresh(accessor);
-
-      Map<String, IdealState> idealStates = cache.getIdealStates();
-      if (idealStates == null) // || idealStates.isEmpty())
-      {
+
+      Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
+      if (idealStates == null) {
         // ideal state is null because ideal state is dropped
         idealStates = Collections.emptyMap();
       }
 
       Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
-      if (extViews == null) // || extViews.isEmpty())
-      {
+      if (extViews == null) {
         extViews = Collections.emptyMap();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index e837626..0731475 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -40,6 +40,9 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
@@ -264,22 +267,26 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
       }
 
       int numberOfPartitions = idealState.getRecord().getListFields().size();
-      ClusterDataCache cache = new ClusterDataCache();
-      cache.refresh(accessor);
-      State masterValue =
-          cache
-              .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify())
-              .getStatesPriorityList().get(0);
-      int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
-      String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
+      String stateModelDefName = idealState.getStateModelDefId().stringify();
+      StateModelDefinition stateModelDef =
+          accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+      State masterValue = stateModelDef.getStatesPriorityList().get(0);
+      int replicas = Integer.parseInt(idealState.getReplicas());
+
+      String instanceGroupTag = idealState.getInstanceGroupTag();
+
       int instances = 0;
-      for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-        if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+      Map<String, LiveInstance> liveInstanceMap =
+          accessor.getChildValuesMap(keyBuilder.liveInstances());
+      Map<String, InstanceConfig> instanceConfigMap =
+          accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      for (String liveInstanceName : liveInstanceMap.keySet()) {
+        if (instanceConfigMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
           instances++;
         }
       }
       if (instances == 0) {
-        instances = cache.getLiveInstances().size();
+        instances = liveInstanceMap.size();
       }
       ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
       if (ev == null) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 2446b67..19535ff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -31,13 +31,15 @@ import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
@@ -50,6 +52,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
   private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
       .getName());
 
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -122,7 +125,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
     // kill 1 node
     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
     _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.currentThread().sleep(1000);
+    Thread.sleep(1000);
     _startCMResultMap.get(instanceName)._thread.interrupt();
 
     // verifyBalanceExternalView();
@@ -137,7 +140,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
     instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
     _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.currentThread().sleep(1000);
+    Thread.sleep(1000);
     _startCMResultMap.get(instanceName)._thread.interrupt();
 
     // verifyBalanceExternalView();
@@ -223,22 +226,20 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
     @Override
     public boolean verify() {
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
       Builder keyBuilder = accessor.keyBuilder();
-      int numberOfPartitions =
-          accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
-              .size();
-      ClusterDataCache cache = new ClusterDataCache();
-      cache.refresh(accessor);
-      State masterValue =
-          cache
-              .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify())
-              .getStatesPriorityList().get(0);
-      int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+      IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
+      int numberOfPartitions = idealState.getRecord().getListFields().size();
+      String stateModelDefName = idealState.getStateModelDefId().stringify();
+      StateModelDefinition stateModelDef =
+          accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+      State masterValue = stateModelDef.getStatesPriorityList().get(0);
+      Map<String, LiveInstance> liveInstanceMap =
+          accessor.getChildValuesMap(keyBuilder.liveInstances());
+      int replicas = Integer.parseInt(idealState.getReplicas());
       return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName))
-          .getRecord(), numberOfPartitions, masterValue.toString(), replicas, cache
-          .getLiveInstances().size(), cache.getIdealState(_resourceName)
-          .getMaxPartitionsPerInstance());
+          .getRecord(), numberOfPartitions, masterValue.toString(), replicas,
+          liveInstanceMap.size(), idealState.getMaxPartitionsPerInstance());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index 833f85e..ede4e12 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -226,14 +226,17 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     idealState.setBatchMessageMode(true);
     accessor.setProperty(keyBuilder.idealState("TestDB0"), idealState);
 
+    final String hostToFail = "localhost_12921";
+    final String partitionToFail = "TestDB0_4";
+
     TestHelper
         .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      if (i == 1) {
+      if (instanceName.equals(hostToFail)) {
         Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
-        errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+        errPartitions.put("SLAVE-MASTER", TestHelper.setOf(partitionToFail));
         participants[i] =
             new MockParticipant(clusterName, instanceName, ZK_ADDR,
                 new ErrTransition(errPartitions));
@@ -245,14 +248,14 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>();
     errStates.put("TestDB0", new HashMap<String, String>());
-    errStates.get("TestDB0").put("TestDB0_4", "localhost_12919");
+    errStates.get("TestDB0").put(partitionToFail, hostToFail);
     boolean result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName, errStates));
     Assert.assertTrue(result);
 
     Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
-    errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12919"));
+    errorStateMap.put(partitionToFail, TestHelper.setOf(hostToFail));
 
     // verify "TestDB0_4", "localhost_12919" is in ERROR state
     TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 90c53d6..a5001ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -35,7 +35,6 @@ import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
 import org.apache.helix.controller.rebalancer.context.Rebalancer;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -44,6 +43,8 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -142,26 +143,26 @@ public class TestCustomizedIdealStateRebalancer extends
         HelixDataAccessor accessor =
             new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
         Builder keyBuilder = accessor.keyBuilder();
-        int numberOfPartitions =
-            accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
-                .size();
-        ClusterDataCache cache = new ClusterDataCache();
-        cache.refresh(accessor);
-        State masterValue =
-            cache
-                .getStateModelDef(
-                    cache.getIdealState(_resourceName).getStateModelDefId().stringify())
-                .getStatesPriorityList().get(0);
-        int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
-        String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
+        IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
+        int numberOfPartitions = idealState.getRecord().getListFields().size();
+        String stateModelDefName = idealState.getStateModelDefId().stringify();
+        StateModelDefinition stateModelDef =
+            accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+        State masterValue = stateModelDef.getStatesPriorityList().get(0);
+        int replicas = Integer.parseInt(idealState.getReplicas());
+        String instanceGroupTag = idealState.getInstanceGroupTag();
         int instances = 0;
-        for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-          if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+        Map<String, LiveInstance> liveInstanceMap =
+            accessor.getChildValuesMap(keyBuilder.liveInstances());
+        Map<String, InstanceConfig> instanceCfgMap =
+            accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+        for (String liveInstanceName : liveInstanceMap.keySet()) {
+          if (instanceCfgMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
             instances++;
           }
         }
         if (instances == 0) {
-          instances = cache.getLiveInstances().size();
+          instances = liveInstanceMap.size();
         }
         ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
         return verifyBalanceExternalView(externalView.getRecord(), numberOfPartitions,