You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/23 19:28:02 UTC

[helix] 16/23: minor fix for customized view aggregation (#917)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fc78947a4c42b3e2f905861ac697a3b301e52b3b
Author: zhangmeng916 <56...@users.noreply.github.com>
AuthorDate: Mon Mar 30 14:26:57 2020 -0700

    minor fix for customized view aggregation (#917)
    
    Fix minor issues in customized view aggregation logic and add some more tests.
    
    Co-authored-by: Meng Zhang <mn...@mnzhang-mn1.linkedin.biz>
---
 .../helix/controller/GenericHelixController.java   | 30 ++++++++++++++++------
 .../ResourceControllerDataProvider.java            | 12 ++++-----
 .../stages/CustomizedViewAggregationStage.java     |  6 ++---
 .../controller/stages/TestCustomizedViewStage.java | 19 ++++++++++----
 .../TestComputeAndCleanupCustomizedView.java       | 26 ++++++++++++++++---
 5 files changed, 66 insertions(+), 27 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8644441..5678a5c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -142,7 +141,9 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
 
-  final AtomicReference<Set<String>> _lastSeenCustomizedStateTypes;
+  // map that stores the mapping between instance and the customized state types available on that
+  //instance
+  final AtomicReference<Map<String, Set<String>>> _lastSeenCustomizedStateTypesMapRef;
 
   // By default not reporting status until controller status is changed to activate
   // TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering.
@@ -585,7 +586,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     _taskRegistry = taskRegistry;
     _lastSeenInstances = new AtomicReference<>();
     _lastSeenSessions = new AtomicReference<>();
-    _lastSeenCustomizedStateTypes = new AtomicReference<>();
+    _lastSeenCustomizedStateTypesMapRef = new AtomicReference<>();
     _clusterName = clusterName;
     _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
     _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
@@ -881,12 +882,24 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
 
     // TODO: remove the synchronization here once we move this update into dataCache.
-    synchronized (_lastSeenCustomizedStateTypes) {
-      Set<String> lastSeenCustomizedStateTypes = _lastSeenCustomizedStateTypes.get();
+    synchronized (_lastSeenCustomizedStateTypesMapRef) {
+      Map<String, Set<String>> lastSeenCustomizedStateTypesMap =
+          _lastSeenCustomizedStateTypesMapRef.get();
+      if (null == lastSeenCustomizedStateTypesMap) {
+        lastSeenCustomizedStateTypesMap = new HashMap();
+        // lazy init the AtomicReference
+        _lastSeenCustomizedStateTypesMapRef.set(lastSeenCustomizedStateTypesMap);
+      }
+
+      if (!lastSeenCustomizedStateTypesMap.containsKey(instanceName)) {
+        lastSeenCustomizedStateTypesMap.put(instanceName, Collections.emptySet());
+      }
+
+      Set<String> lastSeenCustomizedStateTypes = lastSeenCustomizedStateTypesMap.get(instanceName);
+
       for (String customizedState : customizedStateTypes) {
         try {
-          if (lastSeenCustomizedStateTypes == null || !lastSeenCustomizedStateTypes
-              .contains(customizedState)) {
+          if (!lastSeenCustomizedStateTypes.contains(customizedState)) {
             manager.addCustomizedStateChangeListener(this, instanceName, customizedState);
             logger.info(
                 manager.getInstanceName() + " added customized state listener for " + instanceName
@@ -904,7 +917,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
         }
       }
 
-      _lastSeenCustomizedStateTypes.set(new HashSet<>(customizedStateTypes));
+      lastSeenCustomizedStateTypes.clear();
+      lastSeenCustomizedStateTypes.addAll(customizedStateTypes);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 50a3ac9..3625630 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -220,9 +219,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
   public void refreshCustomizedViewMap(final HelixDataAccessor accessor) {
     // As we are not listening on customized view change, customized view will be
     // refreshed once during the cache's first refresh() call, or when full refresh is required
-    List<String> newStateTypes = accessor.getChildNames(accessor.keyBuilder().customizedViews());
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW).getAndSet(false)) {
-      for (String stateType : newStateTypes) {
+      for (String stateType : _aggregationEnabledTypes) {
         if (!_customizedViewCacheMap.containsKey(stateType)) {
           CustomizedViewCache newCustomizedViewCache =
               new CustomizedViewCache(getClusterName(), stateType);
@@ -230,10 +228,10 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
         }
         _customizedViewCacheMap.get(stateType).refresh(accessor);
       }
-      Set<String> previousCachedStateTypes = _customizedViewCacheMap.keySet();
-      previousCachedStateTypes.removeAll(newStateTypes);
+      Set<String> previousCachedStateTypes = new HashSet<>(_customizedViewCacheMap.keySet());
+      previousCachedStateTypes.removeAll(_aggregationEnabledTypes);
       logger.info("Remove customizedView for state: " + previousCachedStateTypes);
-      removeCustomizedViewTypes(new ArrayList<>(previousCachedStateTypes));
+      removeCustomizedViewTypes(previousCachedStateTypes);
     }
   }
 
@@ -306,7 +304,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
    * @param stateTypeNames
    */
 
-  private void removeCustomizedViewTypes(List<String> stateTypeNames) {
+  private void removeCustomizedViewTypes(Set<String> stateTypeNames) {
     for (String stateType : stateTypeNames) {
       _customizedViewCacheMap.remove(stateType);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index de0bc1c..cdcdfd2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -72,19 +72,17 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
 
     Map<String, CustomizedViewCache> customizedViewCacheMap = cache.getCustomizedViewCacheMap();
 
-    // remove stale customized view type from ZK and cache
-    List<String> customizedViewTypesToRemove = new ArrayList<>();
+    // remove stale customized view type from ZK
     for (String stateType : customizedViewCacheMap.keySet()) {
       if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
         LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
         dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
-        customizedViewTypesToRemove.add(stateType);
       }
     }
 
-    List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
     // update customized view
     for (String stateType : customizedStateOutput.getAllStateTypes()) {
+      List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
       Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
       CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
       if (customizedViewCache != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index a8a167b..5c04daf 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -39,8 +39,8 @@ import org.testng.annotations.Test;
 
 
 public class TestCustomizedViewStage extends ZkUnitTestBase {
-  private final String RESOURCE_NAME = "testResourceName";
-  private final String PARTITION_NAME = "testResourceName_0";
+  private final String RESOURCE_NAME = "TestDB";
+  private final String PARTITION_NAME = "TestDB_0";
   private final String CUSTOMIZED_STATE_NAME = "customizedState1";
   private final String INSTANCE_NAME = "localhost_1";
 
@@ -52,7 +52,16 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
 
-    setupLiveInstances(clusterName, new int[]{0, 1});
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[] {
+        0, 1
+    }, new String[] {
+        "TestDB"
+    }, 1, 2);
+    setupLiveInstances(clusterName, new int[] {
+        0, 1
+    });
     setupStateModel(clusterName);
 
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -82,8 +91,8 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
     runStage(event, new ResourceComputationStage());
     runStage(event, new CustomizedStateComputationStage());
     runStage(event, customizedViewComputeStage);
-    Assert.assertEquals(cache.getCustomizedViewCacheMap().values(),
-        accessor.getChildValues(accessor.keyBuilder().customizedViews()));
+    Assert.assertEquals(cache.getCustomizedViewCacheMap().size(),
+        accessor.getChildNames(accessor.keyBuilder().customizedViews()).size());
 
     // Assure there is no customized view got updated when running the stage again
     List<CustomizedView> oldCustomizedViews =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
index 6cdf72d..6de2521 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
@@ -126,7 +126,6 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
     // add CUSTOMIZED_STATE_NAME1 to aggregation enabled types
     aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME1);
     config.setAggregationEnabledTypes(aggregationEnabledTypes);
-    config.setAggregationEnabledTypes(aggregationEnabledTypes);
     accessor.setProperty(keyBuilder.customizedStateConfig(), config);
 
     // verify the customized view should have "STARTED" for CUSTOMIZED_STATE_NAME1 for INSTANCE1,
@@ -164,8 +163,8 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
     }, 12000);
 
     Thread.sleep(50);
-    Assert.assertTrue(result, String.format("Customized view should not have state for instance: "
-            + "%s", INSTANCE_NAME2));
+    Assert.assertTrue(result, String
+        .format("Customized view should not have state for instance: " + "%s", INSTANCE_NAME2));
 
     // set INSTANCE2 to "STARTED" for CUSTOMIZED_STATE_NAME1
     customizedState = new CustomizedState(RESOURCE_NAME);
@@ -222,6 +221,27 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
             + " resource: %s, partition: %s and state: %s", INSTANCE_NAME2, RESOURCE_NAME,
         PARTITION_NAME2, CUSTOMIZED_STATE_NAME2));
 
+    // remove CUSTOMIZED_STATE_NAME1 from aggregation enabled types
+    aggregationEnabledTypes.remove(CUSTOMIZED_STATE_NAME1);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+        if (customizedView == null) {
+          return true;
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result,
+        String.format("Customized view should not have state %s", CUSTOMIZED_STATE_NAME1));
+
     // disable controller
     ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
     admin.enableCluster(clusterName, false);