You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/25 21:44:56 UTC

[5/6] helix git commit: Avoid redundant calculation for disabled instances

Avoid redundant calculation for disabled instances

For Pinot reported issue that BestPossibleStage takes 100s for initial resource calculation, the part of delay caused by recalculating disabled instances for partition.
Improve this with calculate it once when refresh data.
TODO: once we can do major refactor that can identify the ClusterConfig change in cache, we can optimize it with recalculating it only when instance config and cluster config changes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/db949d0f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/db949d0f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/db949d0f

Branch: refs/heads/master
Commit: db949d0f64c5a4ac9923bc7bdf3736dd0e7a8640
Parents: 305add9
Author: Junkai Xue <jx...@linkedin.com>
Authored: Sat Mar 24 21:36:54 2018 -0700
Committer: dasahcc <ju...@gmail.com>
Committed: Sun Mar 25 14:43:42 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 58 +++++++++++++-------
 .../rebalancer/TestAutoRebalanceStrategy.java   |  2 +
 .../stages/TestMessageThrottleStage.java        |  4 ++
 .../stages/TestRebalancePipeline.java           | 23 +++-----
 .../stages/TestResourceValidationStage.java     |  5 +-
 .../task/TestTaskRebalancerStopResume.java      |  3 +-
 6 files changed, 59 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e946a9d..2317197 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -83,6 +83,8 @@ public class ClusterDataCache {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
   private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
+  private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
+  private Set<String> _disabledInstanceSet = new HashSet<>();
 
   private CurrentStateCache _currentStateCache;
   private TaskDataCache _taskDataCache;
@@ -207,6 +209,8 @@ public class ClusterDataCache {
     MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
     _isMaintenanceModeEnabled = (maintenanceSignal != null) ? true : false;
 
+    updateDisabledInstances();
+
     long endTime = System.currentTimeMillis();
     LOG.info(
         "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took " + (endTime
@@ -232,6 +236,33 @@ public class ClusterDataCache {
     return true;
   }
 
+  private void updateDisabledInstances() {
+    // Move the calculating disabled instances to refresh
+    _disabledInstanceForPartitionMap.clear();
+    _disabledInstanceSet.clear();
+    for (InstanceConfig config : _instanceConfigMap.values()) {
+      Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
+      if (!config.getInstanceEnabled()) {
+        _disabledInstanceSet.add(config.getInstanceName());
+      }
+      for (String resource : disabledPartitionMap.keySet()) {
+        if (!_disabledInstanceForPartitionMap.containsKey(resource)) {
+          _disabledInstanceForPartitionMap.put(resource, new HashMap<String, Set<String>>());
+        }
+        for (String partition : disabledPartitionMap.get(resource)) {
+          if (!_disabledInstanceForPartitionMap.get(resource).containsKey(partition)) {
+            _disabledInstanceForPartitionMap.get(resource).put(partition, new HashSet<String>());
+          }
+          _disabledInstanceForPartitionMap.get(resource).get(partition)
+              .add(config.getInstanceName());
+        }
+      }
+    }
+    if (_clusterConfig.getDisabledInstances() != null) {
+      _disabledInstanceSet.addAll(_clusterConfig.getDisabledInstances().keySet());
+    }
+  }
+
   private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
     List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet());
     offlineNodes.removeAll(_liveInstanceMap.keySet());
@@ -533,16 +564,14 @@ public class ClusterDataCache {
    * @return
    */
   public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
-    Set<String> disabledInstancesSet = new HashSet<String>();
-    for (String instance : _instanceConfigMap.keySet()) {
-      InstanceConfig config = _instanceConfigMap.get(instance);
-      if (config.getInstanceEnabled() == false || (_clusterConfig.getDisabledInstances() != null
-          && _clusterConfig.getDisabledInstances().containsKey(instance))
-          || config.getInstanceEnabledForPartition(resource, partition) == false) {
-        disabledInstancesSet.add(instance);
-      }
+    Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);
+    if (_disabledInstanceForPartitionMap.containsKey(resource) && _disabledInstanceForPartitionMap
+        .get(resource).containsKey(partition)) {
+      disabledInstancesForPartition
+          .addAll(_disabledInstanceForPartitionMap.get(resource).get(partition));
     }
-    return disabledInstancesSet;
+
+    return disabledInstancesForPartition;
   }
 
   /**
@@ -551,16 +580,7 @@ public class ClusterDataCache {
    * @return
    */
   public Set<String> getDisabledInstances() {
-    Set<String> disabledInstancesSet = new HashSet<>();
-    for (String instance : _instanceConfigMap.keySet()) {
-      InstanceConfig config = _instanceConfigMap.get(instance);
-      if (!config.getInstanceEnabled()
-          || (_clusterConfig.getDisabledInstances() != null && _clusterConfig.getDisabledInstances()
-          .containsKey(instance))) {
-        disabledInstancesSet.add(instance);
-      }
-    }
-    return disabledInstancesSet;
+    return Collections.unmodifiableSet(_disabledInstanceSet);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 29aa863..56fc615 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,6 +41,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.StateModelDefinition;
@@ -217,6 +218,7 @@ public class TestAutoRebalanceStrategy {
       ClusterDataCache cache = new ClusterDataCache();
       MockAccessor accessor = new MockAccessor();
       Builder keyBuilder = accessor.keyBuilder();
+      accessor.setProperty(keyBuilder.clusterConfig(), new ClusterConfig("TestCluster"));
       for (String node : _liveNodes) {
         LiveInstance liveInstance = new LiveInstance(node);
         liveInstance.setSessionId("testSession");

http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index fa6b084..9054a1d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -72,7 +72,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     setupStateModel(clusterName);
 
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+    ClusterDataCache cache = new ClusterDataCache(clusterName);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
 
     MessageThrottleStage throttleStage = new MessageThrottleStage();
     try {
@@ -259,7 +261,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     // test messageThrottleStage
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+    ClusterDataCache cache = new ClusterDataCache(clusterName);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 7c4bba6..9d4790d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -56,7 +56,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-
+    refreshClusterConfig(clusterName, accessor);
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
@@ -95,7 +95,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "OFFLINE");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -113,7 +112,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "SLAVE");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
+    refreshClusterConfig(clusterName, accessor);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -132,7 +131,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
-
+    refreshClusterConfig(clusterName, accessor);
     final String resourceName = "testResource_dup";
     String[] resourceGroups = new String[] {
       resourceName
@@ -156,7 +155,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
-
     // round1: controller sends O->S to both node0 and node1
     Thread.sleep(1000);
 
@@ -246,7 +244,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     ClusterDataCache cache = new ClusterDataCache();
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
-    refreshClusterConfig(event, clusterName);
+    refreshClusterConfig(clusterName, accessor);
 
     final String resourceName = "testResource_pending";
     String[] resourceGroups = new String[] {
@@ -282,7 +280,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "OFFLINE");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -316,7 +313,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
     accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -341,6 +337,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    refreshClusterConfig(clusterName, accessor);
 
     final String resourceName = "testResource_xfer";
     String[] resourceGroups = new String[] {
@@ -376,7 +373,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "SLAVE");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -397,7 +393,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "SLAVE");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -418,6 +413,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
+    refreshClusterConfig(clusterName, accessor);
 
     final String resourceName = "testResource_no_duplicated_master";
     String[] resourceGroups = new String[] {
@@ -457,7 +453,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         "MASTER");
 
     runPipeline(event, dataRefresh);
-    refreshClusterConfig(event, clusterName);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -493,9 +488,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName), curState);
   }
 
-  private void refreshClusterConfig(ClusterEvent event, String clusterName) {
-    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-    cache.setClusterConfig(new ClusterConfig(clusterName));
-    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+  private void refreshClusterConfig(String clusterName, HelixDataAccessor accessor) {
+    accessor.setProperty(accessor.keyBuilder().clusterConfig(), new ClusterConfig(clusterName));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
index 57cc478..6334720 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -25,6 +25,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -44,6 +45,7 @@ public class TestResourceValidationStage {
   @Test
   public void testIdealStateValidity() throws Exception {
     MockAccessor accessor = new MockAccessor();
+    accessor.setProperty(accessor.keyBuilder().clusterConfig(), new ClusterConfig("TestCluster"));
 
     // create some ideal states
     String masterSlaveCustomResource = "masterSlaveCustomResource";
@@ -88,6 +90,7 @@ public class TestResourceValidationStage {
   @Test
   public void testNoSpec() throws Exception {
     MockAccessor accessor = new MockAccessor();
+    accessor.setProperty(accessor.keyBuilder().clusterConfig(), new ClusterConfig("TestCluster"));
 
     // create an ideal state and no spec
     String masterSlaveCustomResource = "masterSlaveCustomResource";
@@ -116,7 +119,7 @@ public class TestResourceValidationStage {
   @Test
   public void testMissingStateModel() throws Exception {
     MockAccessor accessor = new MockAccessor();
-
+    accessor.setProperty(accessor.keyBuilder().clusterConfig(), new ClusterConfig("TestCluster"));
     // create an ideal state and no spec
     String masterSlaveCustomResource = "masterSlaveCustomResource";
     String leaderStandbyCustomResource = "leaderStandbyCustomResource";

http://git-wip-us.apache.org/repos/asf/helix/blob/db949d0f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index a808633..8540a84 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -253,12 +253,13 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
       preJobFinish = jobContext.getFinishTime();
     }
 
+    TimeUnit.MILLISECONDS.sleep(2000);
     // Flush queue
     LOG.info("Flusing job-queue: " + queueName);
     _driver.flushQueue(queueName);
 
     // TODO: Use TestHelper.verify() instead of waiting here.
-    TimeUnit.MILLISECONDS.sleep(5000);
+    TimeUnit.MILLISECONDS.sleep(2000);
 
     // verify the cleanup
     for (int i = 0; i < currentJobNames.size(); i++) {