You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:46 UTC

[helix] 08/50: Add ChangeDetector interface and ResourceChangeDetector implementation (#388)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit afa297fcae31abc07d5a1186bc5bd0f02a5b4f95
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Aug 15 14:33:02 2019 -0700

    Add ChangeDetector interface and ResourceChangeDetector implementation (#388)
    
    Add ChangeDetector interface and ResourceChangeDetector implementation
    
    In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added.
    
    Changelist:
    1. Add ChangeDetector interface
    2. Implement ResourceChangeDetector
    3. Add ResourceChangeCache, a wrapper for critical cluster metadata
    4. Add an integration test, TestResourceChangeDetector
---
 .../controller/changedetector/ChangeDetector.java  |  57 ++++
 .../changedetector/ResourceChangeDetector.java     | 158 +++++++++
 .../changedetector/ResourceChangeSnapshot.java     | 105 ++++++
 .../ResourceControllerDataProvider.java            |  33 +-
 .../changedetector/TestResourceChangeDetector.java | 357 +++++++++++++++++++++
 5 files changed, 705 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java
new file mode 100644
index 0000000..fbe4afc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java
@@ -0,0 +1,57 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import org.apache.helix.HelixConstants;
+
+/**
+ * ChangeDetector interface that will be used to track deltas in the cluster from one pipeline run
+ * to another. The interface methods are designed to be flexible for both the resource pipeline and
+ * the task pipeline.
+ * TODO: Consider splitting this up into two different ChangeDetector interfaces:
+ * TODO: PropertyBasedChangeDetector and PathBasedChangeDetector.
+ */
+public interface ChangeDetector {
+
+  /**
+   * Returns all types of changes detected.
+   * @return a collection of ChangeTypes
+   */
+  Collection<HelixConstants.ChangeType> getChangeTypes();
+
+  /**
+   * Returns the names of items that changed based on the change type given.
+   * @return a collection of names of items that changed
+   */
+  Collection<String> getChangesByType(HelixConstants.ChangeType changeType);
+
+  /**
+   * Returns the names of items that were added based on the change type given.
+   * @return a collection of names of items that were added
+   */
+  Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType);
+
+  /**
+   * Returns the names of items that were removed based on the change type given.
+   * @return a collection of names of items that were removed
+   */
+  Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
new file mode 100644
index 0000000..d65e609
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -0,0 +1,158 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+
+/**
+ * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
+ * Helix's main resource pipeline cache (DataProvider) and the computation results of change
+ * detection.
+ * WARNING: the methods of this class are not thread-safe.
+ */
+public class ResourceChangeDetector implements ChangeDetector {
+
+  private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
+  private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
+
+  // The following caches the computation results
+  private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>();
+  private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
+  private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
+
+  public ResourceChangeDetector() {
+    _newSnapshot = new ResourceChangeSnapshot();
+  }
+
+  /**
+   * Compare the underlying HelixProperty objects and produce a collection of names of changed
+   * properties.
+   * @return
+   */
+  private Collection<String> getChangedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+      Map<String, ? extends HelixProperty> newPropertyMap) {
+    Collection<String> changedItems = new HashSet<>();
+    oldPropertyMap.forEach((name, property) -> {
+      if (newPropertyMap.containsKey(name)
+          && !property.getRecord().equals(newPropertyMap.get(name).getRecord())) {
+        changedItems.add(name);
+      }
+    });
+    return changedItems;
+  }
+
+  /**
+   * Return a collection of names that are newly added.
+   * @return
+   */
+  private Collection<String> getAddedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+      Map<String, ? extends HelixProperty> newPropertyMap) {
+    return Sets.difference(newPropertyMap.keySet(), oldPropertyMap.keySet());
+  }
+
+  /**
+   * Return a collection of names that were removed.
+   * @return
+   */
+  private Collection<String> getRemovedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
+      Map<String, ? extends HelixProperty> newPropertyMap) {
+    return Sets.difference(oldPropertyMap.keySet(), newPropertyMap.keySet());
+  }
+
+  private void clearCachedComputation() {
+    _changedItems.clear();
+    _addedItems.clear();
+    _removedItems.clear();
+  }
+
+  /**
+   * Based on the change type given and propertyMap type, call the right getters for propertyMap.
+   * @param changeType
+   * @param snapshot
+   * @return
+   */
+  private Map<String, ? extends HelixProperty> determinePropertyMapByType(
+      HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
+    switch (changeType) {
+    case INSTANCE_CONFIG:
+      return snapshot.getInstanceConfigMap();
+    case IDEAL_STATE:
+      return snapshot.getIdealStateMap();
+    case RESOURCE_CONFIG:
+      return snapshot.getResourceConfigMap();
+    case LIVE_INSTANCE:
+      return snapshot.getLiveInstances();
+    default:
+      throw new HelixException(String.format(
+          "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
+          changeType));
+    }
+  }
+
+  /**
+   * Makes the current newSnapshot the oldSnapshot and reads in the up-to-date snapshot for change
+   * computation. To be called in the controller pipeline.
+   * @param dataProvider newly refreshed DataProvider (cache)
+   */
+  public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) {
+    // If there are changes, update internal states
+    _oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
+    _newSnapshot = new ResourceChangeSnapshot(dataProvider);
+    dataProvider.clearRefreshedChangeTypes();
+
+    // Invalidate cached computation
+    clearCachedComputation();
+  }
+
+  @Override
+  public Collection<HelixConstants.ChangeType> getChangeTypes() {
+    return Collections.unmodifiableSet(_newSnapshot.getChangedTypes());
+  }
+
+  @Override
+  public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
+    return _changedItems.computeIfAbsent(changeType,
+        changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+            determinePropertyMapByType(changeType, _newSnapshot)));
+  }
+
+  @Override
+  public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
+    return _addedItems.computeIfAbsent(changeType,
+        changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+            determinePropertyMapByType(changeType, _newSnapshot)));
+  }
+
+  @Override
+  public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
+    return _removedItems.computeIfAbsent(changeType,
+        changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
+            determinePropertyMapByType(changeType, _newSnapshot)));
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
new file mode 100644
index 0000000..cbc3539
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -0,0 +1,105 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+
+/**
+ * ResourceChangeSnapshot is a POJO that contains the following Helix metadata:
+ * 1. InstanceConfig
+ * 2. IdealState
+ * 3. ResourceConfig
+ * 4. LiveInstance
+ * 5. Changed property types
+ * It serves as a snapshot of the main controller cache to enable the difference (change)
+ * calculation between two rounds of the pipeline run.
+ */
+class ResourceChangeSnapshot {
+
+  private Set<HelixConstants.ChangeType> _changedTypes;
+  private Map<String, InstanceConfig> _instanceConfigMap;
+  private Map<String, IdealState> _idealStateMap;
+  private Map<String, ResourceConfig> _resourceConfigMap;
+  private Map<String, LiveInstance> _liveInstances;
+
+  /**
+   * Default constructor that constructs an empty snapshot.
+   */
+  ResourceChangeSnapshot() {
+    _changedTypes = new HashSet<>();
+    _instanceConfigMap = new HashMap<>();
+    _idealStateMap = new HashMap<>();
+    _resourceConfigMap = new HashMap<>();
+    _liveInstances = new HashMap<>();
+  }
+
+  /**
+   * Constructor using controller cache (ResourceControllerDataProvider).
+   * @param dataProvider
+   */
+  ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) {
+    _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
+    _instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
+    _idealStateMap = new HashMap<>(dataProvider.getIdealStates());
+    _resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
+    _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
+  }
+
+  /**
+   * Copy constructor for ResourceChangeCache.
+   * @param cache
+   */
+  ResourceChangeSnapshot(ResourceChangeSnapshot cache) {
+    _changedTypes = new HashSet<>(cache._changedTypes);
+    _instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
+    _idealStateMap = new HashMap<>(cache._idealStateMap);
+    _resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
+    _liveInstances = new HashMap<>(cache._liveInstances);
+  }
+
+  Set<HelixConstants.ChangeType> getChangedTypes() {
+    return _changedTypes;
+  }
+
+  Map<String, InstanceConfig> getInstanceConfigMap() {
+    return _instanceConfigMap;
+  }
+
+  Map<String, IdealState> getIdealStateMap() {
+    return _idealStateMap;
+  }
+
+  Map<String, ResourceConfig> getResourceConfigMap() {
+    return _resourceConfigMap;
+  }
+
+  Map<String, LiveInstance> getLiveInstances() {
+    return _liveInstances;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index b1dc215..9e1550a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -64,6 +65,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
   private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
   private Map<String, Map<String, String>> _lastTopStateLocationMap;
 
+  // Maintain a set of all ChangeTypes for change detection
+  private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
+
   public ResourceControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
   }
@@ -106,19 +110,21 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
     _idealMappingCache = new HashMap<>();
     _missingTopStateMap = new HashMap<>();
     _lastTopStateLocationMap = new HashMap<>();
+    _refreshedChangeTypes = ConcurrentHashMap.newKeySet();
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
 
     // Refresh base
-    Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+    Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor);
+    _refreshedChangeTypes.addAll(changedTypes);
 
     // Invalidate cached information if any of the important data has been refreshed
-    if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
-        || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
-        || propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
-        || propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
+    if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE)
+        || changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
+        || changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
+        || changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
       clearCachedResourceAssignments();
     }
 
@@ -261,6 +267,23 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
     _idealMappingCache.put(resource, mapping);
   }
 
+  /**
+   * Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller
+   * should clear this set by calling {@link #clearRefreshedChangeTypes()}.
+   * @return
+   */
+  public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() {
+    return _refreshedChangeTypes;
+  }
+
+  /**
+   * Clears the set of all PropertyTypes that changed. The caller will have consumed all change
+   * types by calling {@link #getRefreshedChangeTypes()}.
+   */
+  public void clearRefreshedChangeTypes() {
+    _refreshedChangeTypes.clear();
+  }
+
   public void clearCachedResourceAssignments() {
     _resourceAssignmentCache.clear();
     _idealMappingCache.clear();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
new file mode 100644
index 0000000..3ef41e4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -0,0 +1,357 @@
+package org.apache.helix.controller.changedetector;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This test contains a series of unit tests for ResourceChangeDetector.
+ */
+public class TestResourceChangeDetector extends ZkTestBase {
+
+  // All possible change types for ResourceChangeDetector except for ClusterConfig
+  // since we don't provide the names of changed fields for ClusterConfig
+  private static final ChangeType[] RESOURCE_CHANGE_TYPES = {
+      ChangeType.IDEAL_STATE, ChangeType.INSTANCE_CONFIG, ChangeType.LIVE_INSTANCE,
+      ChangeType.RESOURCE_CONFIG
+  };
+
+  private static final String CLUSTER_NAME = TestHelper.getTestClassName();
+  private static final String RESOURCE_NAME = "TestDB";
+  private static final String NEW_RESOURCE_NAME = "TestDB2";
+  private static final String STATE_MODEL = "MasterSlave";
+  // There are 5 possible change types for ResourceChangeDetector
+  private static final int NUM_CHANGE_TYPES = 5;
+  private static final int NUM_RESOURCES = 1;
+  private static final int NUM_PARTITIONS = 10;
+  private static final int NUM_REPLICAS = 3;
+  private static final int NUM_NODES = 5;
+
+  // Create a mock of ResourceControllerDataProvider so that we could manipulate it
+  private ResourceControllerDataProvider _dataProvider;
+  private ResourceChangeDetector _resourceChangeDetector;
+  private ClusterControllerManager _controller;
+  private MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
+  private HelixDataAccessor _dataAccessor;
+  private PropertyKey.Builder _keyBuilder;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    // Set up a mock cluster
+    TestHelper.setupCluster(CLUSTER_NAME, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        RESOURCE_NAME, // resource name prefix
+        NUM_RESOURCES, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_NODES, // nodes
+        NUM_REPLICAS, // replicas
+        STATE_MODEL, true); // do rebalance
+
+    // Start a controller
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "controller_0");
+    _controller.syncStart();
+
+    // Start Participants
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    _keyBuilder = _dataAccessor.keyBuilder();
+    _resourceChangeDetector = new ResourceChangeDetector();
+
+    // Create a custom data provider
+    _dataProvider = new ResourceControllerDataProvider(CLUSTER_NAME);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    for (MockParticipantManager participant : _participants) {
+      if (participant != null && participant.isConnected()) {
+        participant.syncStop();
+      }
+    }
+    _controller.syncStop();
+    deleteCluster(CLUSTER_NAME);
+    Assert.assertFalse(TestHelper.verify(() -> _dataAccessor.getBaseDataAccessor()
+        .exists("/" + CLUSTER_NAME, AccessOption.PERSISTENT), 20000L));
+  }
+
+  /**
+   * Tests the initialization of the change detector. It should tell us that there's been changes
+   * for every change type and for all items per type.
+   * @throws Exception
+   */
+  @Test
+  public void testResourceChangeDetectorInit() throws Exception {
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    Collection<ChangeType> changeTypes = _resourceChangeDetector.getChangeTypes();
+    Assert.assertEquals(changeTypes.size(), NUM_CHANGE_TYPES,
+        "Not all change types have been detected for ResourceChangeDetector!");
+
+    // Check that the right amount of resources show up as added
+    checkDetectionCounts(ChangeType.IDEAL_STATE, NUM_RESOURCES, 0, 0);
+
+    // Check that the right amount of instances show up as added
+    checkDetectionCounts(ChangeType.LIVE_INSTANCE, NUM_NODES, 0, 0);
+    checkDetectionCounts(ChangeType.INSTANCE_CONFIG, NUM_NODES, 0, 0);
+  }
+
+  /**
+   * Add a resource (IS and ResourceConfig) and see if the detector detects it.
+   */
+  @Test(dependsOnMethods = "testResourceChangeDetectorInit")
+  public void testAddResource() {
+    // Create an IS and ResourceConfig
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, NEW_RESOURCE_NAME,
+        NUM_PARTITIONS, STATE_MODEL);
+    ResourceConfig resourceConfig = new ResourceConfig(NEW_RESOURCE_NAME);
+    _dataAccessor.setProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
+    // Manually notify dataProvider
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+
+    // Refresh the data provider
+    _dataProvider.refresh(_dataAccessor);
+
+    // Update the detector
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.IDEAL_STATE, ChangeType.RESOURCE_CONFIG);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
+        checkDetectionCounts(type, 1, 0, 0);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+    // Check that detector gives the right item
+    Assert.assertTrue(_resourceChangeDetector.getAdditionsByType(ChangeType.RESOURCE_CONFIG)
+        .contains(NEW_RESOURCE_NAME));
+  }
+
+  /**
+   * Modify a resource config for the new resource and test that detector detects it.
+   */
+  @Test(dependsOnMethods = "testAddResource")
+  public void testModifyResource() {
+    // Modify resource config
+    ResourceConfig resourceConfig =
+        _dataAccessor.getProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
+    resourceConfig.getRecord().setSimpleField("Did I change?", "Yes!");
+    _dataAccessor.updateProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
+
+    // Notify data provider and check
+    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.RESOURCE_CONFIG);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.RESOURCE_CONFIG) {
+        checkDetectionCounts(type, 0, 1, 0);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+    Assert.assertTrue(_resourceChangeDetector.getChangesByType(ChangeType.RESOURCE_CONFIG)
+        .contains(NEW_RESOURCE_NAME));
+  }
+
+  /**
+   * Delete the new resource and test that detector detects it.
+   */
+  @Test(dependsOnMethods = "testModifyResource")
+  public void testDeleteResource() {
+    // Delete the newly added resource
+    _dataAccessor.removeProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME));
+    _dataAccessor.removeProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
+
+    // Notify data provider and check
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.RESOURCE_CONFIG, ChangeType.IDEAL_STATE);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
+        checkDetectionCounts(type, 0, 0, 1);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+  }
+
+  /**
+   * Disconnect and reconnect a Participant and see if detector detects.
+   */
+  @Test(dependsOnMethods = "testDeleteResource")
+  public void testDisconnectReconnectInstance() {
+    // Disconnect a Participant
+    _participants[0].syncStop();
+    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.LIVE_INSTANCE);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.LIVE_INSTANCE) {
+        checkDetectionCounts(type, 0, 0, 1);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+
+    // Reconnect the Participant
+    _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "localhost_12918");
+    _participants[0].syncStart();
+    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.LIVE_INSTANCE);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.LIVE_INSTANCE) {
+        checkDetectionCounts(type, 1, 0, 0);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+  }
+
+  /**
+   * Remove an instance completely and see if detector detects.
+   */
+  @Test(dependsOnMethods = "testDisconnectReconnectInstance")
+  public void testRemoveInstance() {
+    _participants[0].syncStop();
+    InstanceConfig instanceConfig =
+        _dataAccessor.getProperty(_keyBuilder.instanceConfig(_participants[0].getInstanceName()));
+    _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);
+
+    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
+    _dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.LIVE_INSTANCE, ChangeType.INSTANCE_CONFIG);
+    // Check the counts
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      if (type == ChangeType.LIVE_INSTANCE || type == ChangeType.INSTANCE_CONFIG) {
+        checkDetectionCounts(type, 0, 0, 1);
+      } else {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+  }
+
+  /**
+   * Modify cluster config and see if detector detects.
+   */
+  @Test(dependsOnMethods = "testRemoveInstance")
+  public void testModifyClusterConfig() {
+    // Modify cluster config
+    ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
+    clusterConfig.setTopology("Change");
+    _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+    _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
+    _dataProvider.refresh(_dataAccessor);
+    _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+    checkChangeTypes(ChangeType.CLUSTER_CONFIG);
+    // Check the counts for other types
+    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+      checkDetectionCounts(type, 0, 0, 0);
+    }
+  }
+
+  /**
+   * Test that change detector gives correct results when there are no changes after updating
+   * snapshots.
+   */
+  @Test(dependsOnMethods = "testModifyClusterConfig")
+  public void testNoChange() {
+    // Test twice to make sure that no change is stable across different runs
+    for (int i = 0; i < 2; i++) {
+      _dataProvider.refresh(_dataAccessor);
+      _resourceChangeDetector.updateSnapshots(_dataProvider);
+
+      Assert.assertEquals(_resourceChangeDetector.getChangeTypes().size(), 0);
+      // Check the counts for all the other types
+      for (ChangeType type : RESOURCE_CHANGE_TYPES) {
+        checkDetectionCounts(type, 0, 0, 0);
+      }
+    }
+  }
+
+  /**
+   * Check that the given change types appear in detector's change types.
+   * @param types
+   */
+  private void checkChangeTypes(ChangeType... types) {
+    for (ChangeType type : types) {
+      Assert.assertTrue(_resourceChangeDetector.getChangeTypes().contains(type));
+    }
+  }
+
+  /**
+   * Convenience method for checking three types of detections.
+   * @param changeType
+   * @param additions
+   * @param changes
+   * @param deletions
+   */
+  private void checkDetectionCounts(ChangeType changeType, int additions, int changes,
+      int deletions) {
+    Assert.assertEquals(_resourceChangeDetector.getAdditionsByType(changeType).size(), additions);
+    Assert.assertEquals(_resourceChangeDetector.getChangesByType(changeType).size(), changes);
+    Assert.assertEquals(_resourceChangeDetector.getRemovalsByType(changeType).size(), deletions);
+  }
+}