You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:46 UTC
[helix] 08/50: Add ChangeDetector interface and
ResourceChangeDetector implementation (#388)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit afa297fcae31abc07d5a1186bc5bd0f02a5b4f95
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Aug 15 14:33:02 2019 -0700
Add ChangeDetector interface and ResourceChangeDetector implementation (#388)
Add ChangeDetector interface and ResourceChangeDetector implementation
In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added.
Changelist:
1. Add ChangeDetector interface
2. Implement ResourceChangeDetector
3. Add ResourceChangeCache, a wrapper for critical cluster metadata
4. Add an integration test, TestResourceChangeDetector
---
.../controller/changedetector/ChangeDetector.java | 57 ++++
.../changedetector/ResourceChangeDetector.java | 158 +++++++++
.../changedetector/ResourceChangeSnapshot.java | 105 ++++++
.../ResourceControllerDataProvider.java | 33 +-
.../changedetector/TestResourceChangeDetector.java | 357 +++++++++++++++++++++
5 files changed, 705 insertions(+), 5 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java
new file mode 100644
index 0000000..fbe4afc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java
@@ -0,0 +1,57 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import org.apache.helix.HelixConstants;
+
+/**
+ * ChangeDetector interface that will be used to track deltas in the cluster from one pipeline run
+ * to another. The interface methods are designed to be flexible for both the resource pipeline and
+ * the task pipeline.
+ * TODO: Consider splitting this up into two different ChangeDetector interfaces:
+ * TODO: PropertyBasedChangeDetector and PathBasedChangeDetector.
+ */
+public interface ChangeDetector {
+
+ /**
+ * Returns all types of changes detected.
+ * @return a collection of ChangeTypes
+ */
+ Collection<HelixConstants.ChangeType> getChangeTypes();
+
+ /**
+ * Returns the names of items that changed based on the change type given.
+ * @return a collection of names of items that changed
+ */
+ Collection<String> getChangesByType(HelixConstants.ChangeType changeType);
+
+ /**
+ * Returns the names of items that were added based on the change type given.
+ * @return a collection of names of items that were added
+ */
+ Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType);
+
+ /**
+ * Returns the names of items that were removed based on the change type given.
+ * @return a collection of names of items that were removed
+ */
+ Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
new file mode 100644
index 0000000..d65e609
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -0,0 +1,158 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+
+/**
+ * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
+ * Helix's main resource pipeline cache (DataProvider) and the computation results of change
+ * detection.
+ * WARNING: the methods of this class are not thread-safe.
+ */
+public class ResourceChangeDetector implements ChangeDetector {
+
+ private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
+ private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
+
+ // The following caches the computation results
+ private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>();
+ private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
+ private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
+
+ public ResourceChangeDetector() {
+ _newSnapshot = new ResourceChangeSnapshot();
+ }
+
+ /**
+ * Compare the underlying HelixProperty objects and produce a collection of names of changed
+ * properties.
+ * @return
+ */
+ private Collection<String> getChangedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+ Map<String, ? extends HelixProperty> newPropertyMap) {
+ Collection<String> changedItems = new HashSet<>();
+ oldPropertyMap.forEach((name, property) -> {
+ if (newPropertyMap.containsKey(name)
+ && !property.getRecord().equals(newPropertyMap.get(name).getRecord())) {
+ changedItems.add(name);
+ }
+ });
+ return changedItems;
+ }
+
+ /**
+ * Return a collection of names that are newly added.
+ * @return
+ */
+ private Collection<String> getAddedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+ Map<String, ? extends HelixProperty> newPropertyMap) {
+ return Sets.difference(newPropertyMap.keySet(), oldPropertyMap.keySet());
+ }
+
+ /**
+ * Return a collection of names that were removed.
+ * @return
+ */
+ private Collection<String> getRemovedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+ Map<String, ? extends HelixProperty> newPropertyMap) {
+ return Sets.difference(oldPropertyMap.keySet(), newPropertyMap.keySet());
+ }
+
+ private void clearCachedComputation() {
+ _changedItems.clear();
+ _addedItems.clear();
+ _removedItems.clear();
+ }
+
+ /**
+ * Based on the change type given and propertyMap type, call the right getters for propertyMap.
+ * @param changeType
+ * @param snapshot
+ * @return
+ */
+ private Map<String, ? extends HelixProperty> determinePropertyMapByType(
+ HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
+ switch (changeType) {
+ case INSTANCE_CONFIG:
+ return snapshot.getInstanceConfigMap();
+ case IDEAL_STATE:
+ return snapshot.getIdealStateMap();
+ case RESOURCE_CONFIG:
+ return snapshot.getResourceConfigMap();
+ case LIVE_INSTANCE:
+ return snapshot.getLiveInstances();
+ default:
+ throw new HelixException(String.format(
+ "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
+ changeType));
+ }
+ }
+
+ /**
+ * Makes the current newSnapshot the oldSnapshot and reads in the up-to-date snapshot for change
+ * computation. To be called in the controller pipeline.
+ * @param dataProvider newly refreshed DataProvider (cache)
+ */
+ public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) {
+ // If there are changes, update internal states
+ _oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
+ _newSnapshot = new ResourceChangeSnapshot(dataProvider);
+ dataProvider.clearRefreshedChangeTypes();
+
+ // Invalidate cached computation
+ clearCachedComputation();
+ }
+
+ @Override
+ public Collection<HelixConstants.ChangeType> getChangeTypes() {
+ return Collections.unmodifiableSet(_newSnapshot.getChangedTypes());
+ }
+
+ @Override
+ public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
+ return _changedItems.computeIfAbsent(changeType,
+ changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+ determinePropertyMapByType(changeType, _newSnapshot)));
+ }
+
+ @Override
+ public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
+ return _addedItems.computeIfAbsent(changeType,
+ changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+ determinePropertyMapByType(changeType, _newSnapshot)));
+ }
+
+ @Override
+ public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
+ return _removedItems.computeIfAbsent(changeType,
+ changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+ determinePropertyMapByType(changeType, _newSnapshot)));
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
new file mode 100644
index 0000000..cbc3539
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -0,0 +1,105 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+
+/**
+ * ResourceChangeSnapshot is a POJO that contains the following Helix metadata:
+ * 1. InstanceConfig
+ * 2. IdealState
+ * 3. ResourceConfig
+ * 4. LiveInstance
+ * 5. Changed property types
+ * It serves as a snapshot of the main controller cache to enable the difference (change)
+ * calculation between two rounds of the pipeline run.
+ */
+class ResourceChangeSnapshot {
+
+ private Set<HelixConstants.ChangeType> _changedTypes;
+ private Map<String, InstanceConfig> _instanceConfigMap;
+ private Map<String, IdealState> _idealStateMap;
+ private Map<String, ResourceConfig> _resourceConfigMap;
+ private Map<String, LiveInstance> _liveInstances;
+
+ /**
+ * Default constructor that constructs an empty snapshot.
+ */
+ ResourceChangeSnapshot() {
+ _changedTypes = new HashSet<>();
+ _instanceConfigMap = new HashMap<>();
+ _idealStateMap = new HashMap<>();
+ _resourceConfigMap = new HashMap<>();
+ _liveInstances = new HashMap<>();
+ }
+
+ /**
+ * Constructor using controller cache (ResourceControllerDataProvider).
+ * @param dataProvider
+ */
+ ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) {
+ _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
+ _instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
+ _idealStateMap = new HashMap<>(dataProvider.getIdealStates());
+ _resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
+ _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
+ }
+
+ /**
+ * Copy constructor for ResourceChangeCache.
+ * @param cache
+ */
+ ResourceChangeSnapshot(ResourceChangeSnapshot cache) {
+ _changedTypes = new HashSet<>(cache._changedTypes);
+ _instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
+ _idealStateMap = new HashMap<>(cache._idealStateMap);
+ _resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
+ _liveInstances = new HashMap<>(cache._liveInstances);
+ }
+
+ Set<HelixConstants.ChangeType> getChangedTypes() {
+ return _changedTypes;
+ }
+
+ Map<String, InstanceConfig> getInstanceConfigMap() {
+ return _instanceConfigMap;
+ }
+
+ Map<String, IdealState> getIdealStateMap() {
+ return _idealStateMap;
+ }
+
+ Map<String, ResourceConfig> getResourceConfigMap() {
+ return _resourceConfigMap;
+ }
+
+ Map<String, LiveInstance> getLiveInstances() {
+ return _liveInstances;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index b1dc215..9e1550a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
@@ -64,6 +65,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
private Map<String, Map<String, String>> _lastTopStateLocationMap;
+ // Maintain a set of all ChangeTypes for change detection
+ private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
+
public ResourceControllerDataProvider() {
this(AbstractDataCache.UNKNOWN_CLUSTER);
}
@@ -106,19 +110,21 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
_idealMappingCache = new HashMap<>();
_missingTopStateMap = new HashMap<>();
_lastTopStateLocationMap = new HashMap<>();
+ _refreshedChangeTypes = ConcurrentHashMap.newKeySet();
}
public synchronized void refresh(HelixDataAccessor accessor) {
long startTime = System.currentTimeMillis();
// Refresh base
- Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+ Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor);
+ _refreshedChangeTypes.addAll(changedTypes);
// Invalidate cached information if any of the important data has been refreshed
- if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
- || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
- || propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
- || propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
+ if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE)
+ || changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
+ || changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
+ || changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
clearCachedResourceAssignments();
}
@@ -261,6 +267,23 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
_idealMappingCache.put(resource, mapping);
}
+ /**
+ * Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller
+ * should clear this set by calling {@link #clearRefreshedChangeTypes()}.
+ * @return
+ */
+ public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() {
+ return _refreshedChangeTypes;
+ }
+
+ /**
+ * Clears the set of all PropertyTypes that changed. The caller will have consumed all change
+ * types by calling {@link #getRefreshedChangeTypes()}.
+ */
+ public void clearRefreshedChangeTypes() {
+ _refreshedChangeTypes.clear();
+ }
+
public void clearCachedResourceAssignments() {
_resourceAssignmentCache.clear();
_idealMappingCache.clear();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
new file mode 100644
index 0000000..3ef41e4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -0,0 +1,357 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test contains a series of unit tests for ResourceChangeDetector.
+ */
+public class TestResourceChangeDetector extends ZkTestBase {
+
+ // All possible change types for ResourceChangeDetector except for ClusterConfig
+ // since we don't provide the names of changed fields for ClusterConfig
+ private static final ChangeType[] RESOURCE_CHANGE_TYPES = {
+ ChangeType.IDEAL_STATE, ChangeType.INSTANCE_CONFIG, ChangeType.LIVE_INSTANCE,
+ ChangeType.RESOURCE_CONFIG
+ };
+
+ private static final String CLUSTER_NAME = TestHelper.getTestClassName();
+ private static final String RESOURCE_NAME = "TestDB";
+ private static final String NEW_RESOURCE_NAME = "TestDB2";
+ private static final String STATE_MODEL = "MasterSlave";
+ // There are 5 possible change types for ResourceChangeDetector
+ private static final int NUM_CHANGE_TYPES = 5;
+ private static final int NUM_RESOURCES = 1;
+ private static final int NUM_PARTITIONS = 10;
+ private static final int NUM_REPLICAS = 3;
+ private static final int NUM_NODES = 5;
+
+ // Create a mock of ResourceControllerDataProvider so that we could manipulate it
+ private ResourceControllerDataProvider _dataProvider;
+ private ResourceChangeDetector _resourceChangeDetector;
+ private ClusterControllerManager _controller;
+ private MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
+ private HelixDataAccessor _dataAccessor;
+ private PropertyKey.Builder _keyBuilder;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ // Set up a mock cluster
+ TestHelper.setupCluster(CLUSTER_NAME, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ RESOURCE_NAME, // resource name prefix
+ NUM_RESOURCES, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_NODES, // nodes
+ NUM_REPLICAS, // replicas
+ STATE_MODEL, true); // do rebalance
+
+ // Start a controller
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "controller_0");
+ _controller.syncStart();
+
+ // Start Participants
+ for (int i = 0; i < NUM_NODES; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ }
+
+ _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ _keyBuilder = _dataAccessor.keyBuilder();
+ _resourceChangeDetector = new ResourceChangeDetector();
+
+ // Create a custom data provider
+ _dataProvider = new ResourceControllerDataProvider(CLUSTER_NAME);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ for (MockParticipantManager participant : _participants) {
+ if (participant != null && participant.isConnected()) {
+ participant.syncStop();
+ }
+ }
+ _controller.syncStop();
+ deleteCluster(CLUSTER_NAME);
+ Assert.assertFalse(TestHelper.verify(() -> _dataAccessor.getBaseDataAccessor()
+ .exists("/" + CLUSTER_NAME, AccessOption.PERSISTENT), 20000L));
+ }
+
+ /**
+ * Tests the initialization of the change detector. It should tell us that there's been changes
+ * for every change type and for all items per type.
+ * @throws Exception
+ */
+ @Test
+ public void testResourceChangeDetectorInit() throws Exception {
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ Collection<ChangeType> changeTypes = _resourceChangeDetector.getChangeTypes();
+ Assert.assertEquals(changeTypes.size(), NUM_CHANGE_TYPES,
+ "Not all change types have been detected for ResourceChangeDetector!");
+
+ // Check that the right amount of resources show up as added
+ checkDetectionCounts(ChangeType.IDEAL_STATE, NUM_RESOURCES, 0, 0);
+
+ // Check that the right amount of instances show up as added
+ checkDetectionCounts(ChangeType.LIVE_INSTANCE, NUM_NODES, 0, 0);
+ checkDetectionCounts(ChangeType.INSTANCE_CONFIG, NUM_NODES, 0, 0);
+ }
+
+ /**
+ * Add a resource (IS and ResourceConfig) and see if the detector detects it.
+ */
+ @Test(dependsOnMethods = "testResourceChangeDetectorInit")
+ public void testAddResource() {
+ // Create an IS and ResourceConfig
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, NEW_RESOURCE_NAME,
+ NUM_PARTITIONS, STATE_MODEL);
+ ResourceConfig resourceConfig = new ResourceConfig(NEW_RESOURCE_NAME);
+ _dataAccessor.setProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
+ // Manually notify dataProvider
+ _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+ _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+
+ // Refresh the data provider
+ _dataProvider.refresh(_dataAccessor);
+
+ // Update the detector
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.IDEAL_STATE, ChangeType.RESOURCE_CONFIG);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
+ checkDetectionCounts(type, 1, 0, 0);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ // Check that detector gives the right item
+ Assert.assertTrue(_resourceChangeDetector.getAdditionsByType(ChangeType.RESOURCE_CONFIG)
+ .contains(NEW_RESOURCE_NAME));
+ }
+
+ /**
+ * Modify a resource config for the new resource and test that detector detects it.
+ */
+ @Test(dependsOnMethods = "testAddResource")
+ public void testModifyResource() {
+ // Modify resource config
+ ResourceConfig resourceConfig =
+ _dataAccessor.getProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
+ resourceConfig.getRecord().setSimpleField("Did I change?", "Yes!");
+ _dataAccessor.updateProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
+
+ // Notify data provider and check
+ _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.RESOURCE_CONFIG);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.RESOURCE_CONFIG) {
+ checkDetectionCounts(type, 0, 1, 0);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ Assert.assertTrue(_resourceChangeDetector.getChangesByType(ChangeType.RESOURCE_CONFIG)
+ .contains(NEW_RESOURCE_NAME));
+ }
+
+ /**
+ * Delete the new resource and test that detector detects it.
+ */
+ @Test(dependsOnMethods = "testModifyResource")
+ public void testDeleteResource() {
+ // Delete the newly added resource
+ _dataAccessor.removeProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME));
+ _dataAccessor.removeProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
+
+ // Notify data provider and check
+ _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+ _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.RESOURCE_CONFIG, ChangeType.IDEAL_STATE);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
+ checkDetectionCounts(type, 0, 0, 1);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ }
+
+ /**
+ * Disconnect and reconnect a Participant and see if detector detects.
+ */
+ @Test(dependsOnMethods = "testDeleteResource")
+ public void testDisconnectReconnectInstance() {
+ // Disconnect a Participant
+ _participants[0].syncStop();
+ _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.LIVE_INSTANCE);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.LIVE_INSTANCE) {
+ checkDetectionCounts(type, 0, 0, 1);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+
+ // Reconnect the Participant
+ _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "localhost_12918");
+ _participants[0].syncStart();
+ _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.LIVE_INSTANCE);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.LIVE_INSTANCE) {
+ checkDetectionCounts(type, 1, 0, 0);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ }
+
+ /**
+ * Remove an instance completely and see if detector detects.
+ */
+ @Test(dependsOnMethods = "testDisconnectReconnectInstance")
+ public void testRemoveInstance() {
+ _participants[0].syncStop();
+ InstanceConfig instanceConfig =
+ _dataAccessor.getProperty(_keyBuilder.instanceConfig(_participants[0].getInstanceName()));
+ _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);
+
+ _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+ _dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.LIVE_INSTANCE, ChangeType.INSTANCE_CONFIG);
+ // Check the counts
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ if (type == ChangeType.LIVE_INSTANCE || type == ChangeType.INSTANCE_CONFIG) {
+ checkDetectionCounts(type, 0, 0, 1);
+ } else {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ }
+
+ /**
+ * Modify cluster config and see if detector detects.
+ */
+ @Test(dependsOnMethods = "testRemoveInstance")
+ public void testModifyClusterConfig() {
+ // Modify cluster config
+ ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
+ clusterConfig.setTopology("Change");
+ _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+ _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ checkChangeTypes(ChangeType.CLUSTER_CONFIG);
+ // Check the counts for other types
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+
+ /**
+ * Test that change detector gives correct results when there are no changes after updating
+ * snapshots.
+ */
+ @Test(dependsOnMethods = "testModifyClusterConfig")
+ public void testNoChange() {
+ // Test twice to make sure that no change is stable across different runs
+ for (int i = 0; i < 2; i++) {
+ _dataProvider.refresh(_dataAccessor);
+ _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+ Assert.assertEquals(_resourceChangeDetector.getChangeTypes().size(), 0);
+ // Check the counts for all the other types
+ for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+ checkDetectionCounts(type, 0, 0, 0);
+ }
+ }
+ }
+
+ /**
+ * Check that the given change types appear in detector's change types.
+ * @param types
+ */
+ private void checkChangeTypes(ChangeType... types) {
+ for (ChangeType type : types) {
+ Assert.assertTrue(_resourceChangeDetector.getChangeTypes().contains(type));
+ }
+ }
+
+ /**
+ * Convenience method for checking three types of detections.
+ * @param changeType
+ * @param additions
+ * @param changes
+ * @param deletions
+ */
+ private void checkDetectionCounts(ChangeType changeType, int additions, int changes,
+ int deletions) {
+ Assert.assertEquals(_resourceChangeDetector.getAdditionsByType(changeType).size(), additions);
+ Assert.assertEquals(_resourceChangeDetector.getChangesByType(changeType).size(), changes);
+ Assert.assertEquals(_resourceChangeDetector.getRemovalsByType(changeType).size(), deletions);
+ }
+}