You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/30 21:27:04 UTC
[helix] branch customizeView updated: minor fix for customized view
aggregation (#917)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch customizeView
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/customizeView by this push:
new b7faf39 minor fix for customized view aggregation (#917)
b7faf39 is described below
commit b7faf39530fac3cd8d83729d6f3e04e5d9f87393
Author: zhangmeng916 <56...@users.noreply.github.com>
AuthorDate: Mon Mar 30 14:26:57 2020 -0700
minor fix for customized view aggregation (#917)
Fix minor issues in customized view aggregation logic and add some more tests.
Co-authored-by: Meng Zhang <mn...@mnzhang-mn1.linkedin.biz>
---
.../helix/controller/GenericHelixController.java | 30 ++++++++++++++++------
.../ResourceControllerDataProvider.java | 12 ++++-----
.../stages/CustomizedViewAggregationStage.java | 6 ++---
.../controller/stages/TestCustomizedViewStage.java | 19 ++++++++++----
.../TestComputeAndCleanupCustomizedView.java | 26 ++++++++++++++++---
5 files changed, 66 insertions(+), 27 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c5c1b95..33c436a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -139,7 +138,9 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
- final AtomicReference<Set<String>> _lastSeenCustomizedStateTypes;
+ // map that stores the mapping between instance and the customized state types available on that
+ //instance
+ final AtomicReference<Map<String, Set<String>>> _lastSeenCustomizedStateTypesMapRef;
// By default not reporting status until controller status is changed to activate
// TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering.
@@ -548,7 +549,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
_taskRegistry = taskRegistry;
_lastSeenInstances = new AtomicReference<>();
_lastSeenSessions = new AtomicReference<>();
- _lastSeenCustomizedStateTypes = new AtomicReference<>();
+ _lastSeenCustomizedStateTypesMapRef = new AtomicReference<>();
_clusterName = clusterName;
_lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
_clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
@@ -828,12 +829,24 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
}
// TODO: remove the synchronization here once we move this update into dataCache.
- synchronized (_lastSeenCustomizedStateTypes) {
- Set<String> lastSeenCustomizedStateTypes = _lastSeenCustomizedStateTypes.get();
+ synchronized (_lastSeenCustomizedStateTypesMapRef) {
+ Map<String, Set<String>> lastSeenCustomizedStateTypesMap =
+ _lastSeenCustomizedStateTypesMapRef.get();
+ if (null == lastSeenCustomizedStateTypesMap) {
+ lastSeenCustomizedStateTypesMap = new HashMap();
+ // lazy init the AtomicReference
+ _lastSeenCustomizedStateTypesMapRef.set(lastSeenCustomizedStateTypesMap);
+ }
+
+ if (!lastSeenCustomizedStateTypesMap.containsKey(instanceName)) {
+ lastSeenCustomizedStateTypesMap.put(instanceName, Collections.emptySet());
+ }
+
+ Set<String> lastSeenCustomizedStateTypes = lastSeenCustomizedStateTypesMap.get(instanceName);
+
for (String customizedState : customizedStateTypes) {
try {
- if (lastSeenCustomizedStateTypes == null || !lastSeenCustomizedStateTypes
- .contains(customizedState)) {
+ if (!lastSeenCustomizedStateTypes.contains(customizedState)) {
manager.addCustomizedStateChangeListener(this, instanceName, customizedState);
logger.info(
manager.getInstanceName() + " added customized state listener for " + instanceName
@@ -851,7 +864,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
}
}
- _lastSeenCustomizedStateTypes.set(new HashSet<>(customizedStateTypes));
+ lastSeenCustomizedStateTypes.clear();
+ lastSeenCustomizedStateTypes.addAll(customizedStateTypes);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index fc4ee1b..57f477f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.dataproviders;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -197,9 +196,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
public void refreshCustomizedViewMap(final HelixDataAccessor accessor) {
// As we are not listening on customized view change, customized view will be
// refreshed once during the cache's first refresh() call, or when full refresh is required
- List<String> newStateTypes = accessor.getChildNames(accessor.keyBuilder().customizedViews());
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW).getAndSet(false)) {
- for (String stateType : newStateTypes) {
+ for (String stateType : _aggregationEnabledTypes) {
if (!_customizedViewCacheMap.containsKey(stateType)) {
CustomizedViewCache newCustomizedViewCache =
new CustomizedViewCache(getClusterName(), stateType);
@@ -207,10 +205,10 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
}
_customizedViewCacheMap.get(stateType).refresh(accessor);
}
- Set<String> previousCachedStateTypes = _customizedViewCacheMap.keySet();
- previousCachedStateTypes.removeAll(newStateTypes);
+ Set<String> previousCachedStateTypes = new HashSet<>(_customizedViewCacheMap.keySet());
+ previousCachedStateTypes.removeAll(_aggregationEnabledTypes);
logger.info("Remove customizedView for state: " + previousCachedStateTypes);
- removeCustomizedViewTypes(new ArrayList<>(previousCachedStateTypes));
+ removeCustomizedViewTypes(previousCachedStateTypes);
}
}
@@ -283,7 +281,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
* @param stateTypeNames
*/
- private void removeCustomizedViewTypes(List<String> stateTypeNames) {
+ private void removeCustomizedViewTypes(Set<String> stateTypeNames) {
for (String stateType : stateTypeNames) {
_customizedViewCacheMap.remove(stateType);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index de0bc1c..cdcdfd2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -72,19 +72,17 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
Map<String, CustomizedViewCache> customizedViewCacheMap = cache.getCustomizedViewCacheMap();
- // remove stale customized view type from ZK and cache
- List<String> customizedViewTypesToRemove = new ArrayList<>();
+ // remove stale customized view type from ZK
for (String stateType : customizedViewCacheMap.keySet()) {
if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
- customizedViewTypesToRemove.add(stateType);
}
}
- List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
// update customized view
for (String stateType : customizedStateOutput.getAllStateTypes()) {
+ List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
if (customizedViewCache != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index a8a167b..5c04daf 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -39,8 +39,8 @@ import org.testng.annotations.Test;
public class TestCustomizedViewStage extends ZkUnitTestBase {
- private final String RESOURCE_NAME = "testResourceName";
- private final String PARTITION_NAME = "testResourceName_0";
+ private final String RESOURCE_NAME = "TestDB";
+ private final String PARTITION_NAME = "TestDB_0";
private final String CUSTOMIZED_STATE_NAME = "customizedState1";
private final String INSTANCE_NAME = "localhost_1";
@@ -52,7 +52,16 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
- setupLiveInstances(clusterName, new int[]{0, 1});
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[] {
+ 0, 1
+ }, new String[] {
+ "TestDB"
+ }, 1, 2);
+ setupLiveInstances(clusterName, new int[] {
+ 0, 1
+ });
setupStateModel(clusterName);
ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -82,8 +91,8 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
runStage(event, new ResourceComputationStage());
runStage(event, new CustomizedStateComputationStage());
runStage(event, customizedViewComputeStage);
- Assert.assertEquals(cache.getCustomizedViewCacheMap().values(),
- accessor.getChildValues(accessor.keyBuilder().customizedViews()));
+ Assert.assertEquals(cache.getCustomizedViewCacheMap().size(),
+ accessor.getChildNames(accessor.keyBuilder().customizedViews()).size());
// Assure there is no customized view got updated when running the stage again
List<CustomizedView> oldCustomizedViews =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
index 6cdf72d..6de2521 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
@@ -126,7 +126,6 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
// add CUSTOMIZED_STATE_NAME1 to aggregation enabled types
aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME1);
config.setAggregationEnabledTypes(aggregationEnabledTypes);
- config.setAggregationEnabledTypes(aggregationEnabledTypes);
accessor.setProperty(keyBuilder.customizedStateConfig(), config);
// verify the customized view should have "STARTED" for CUSTOMIZED_STATE_NAME1 for INSTANCE1,
@@ -164,8 +163,8 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
}, 12000);
Thread.sleep(50);
- Assert.assertTrue(result, String.format("Customized view should not have state for instance: "
- + "%s", INSTANCE_NAME2));
+ Assert.assertTrue(result, String
+ .format("Customized view should not have state for instance: " + "%s", INSTANCE_NAME2));
// set INSTANCE2 to "STARTED" for CUSTOMIZED_STATE_NAME1
customizedState = new CustomizedState(RESOURCE_NAME);
@@ -222,6 +221,27 @@ public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
+ " resource: %s, partition: %s and state: %s", INSTANCE_NAME2, RESOURCE_NAME,
PARTITION_NAME2, CUSTOMIZED_STATE_NAME2));
+ // remove CUSTOMIZED_STATE_NAME1 from aggregation enabled types
+ aggregationEnabledTypes.remove(CUSTOMIZED_STATE_NAME1);
+ config.setAggregationEnabledTypes(aggregationEnabledTypes);
+ accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+ result = TestHelper.verify(new TestHelper.Verifier() {
+ @Override
+ public boolean verify() {
+ CustomizedView customizedView =
+ accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+ if (customizedView == null) {
+ return true;
+ }
+ return false;
+ }
+ }, 12000);
+
+ Thread.sleep(50);
+ Assert.assertTrue(result,
+ String.format("Customized view should not have state %s", CUSTOMIZED_STATE_NAME1));
+
// disable controller
ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.enableCluster(clusterName, false);