You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by pz...@apache.org on 2020/04/30 21:09:00 UTC

[knox] branch master updated: KNOX-2368 - CM Cluster Configuration Monitor Does Not Support Rolling Restart Events

This is an automated email from the ASF dual-hosted git repository.

pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 57184ab  KNOX-2368 - CM Cluster Configuration Monitor Does Not Support Rolling Restart Events
57184ab is described below

commit 57184ab02ffe8020f9b91f64b73d73e1d7f38131
Author: Phil Zampino <pz...@apache.org>
AuthorDate: Thu Apr 30 17:08:51 2020 -0400

    KNOX-2368 - CM Cluster Configuration Monitor Does Not Support Rolling Restart Events
---
 .../ClouderaManagerServiceDiscoveryMessages.java   |   9 +-
 .../cm/monitor/PollingConfigurationAnalyzer.java   |  51 ++-
 .../monitor/PollingConfigurationAnalyzerTest.java  | 437 +++++++++++++++------
 3 files changed, 352 insertions(+), 145 deletions(-)

diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 7f7c644..d04c777 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -162,10 +162,11 @@ public interface ClouderaManagerServiceDiscoveryMessages {
                                           String clusterName,
                                           String discoveryAddress);
 
-  @Message(level = MessageLevel.DEBUG, text = "Querying restart events from {0} @ {1} since {2}")
-  void queryingRestartEventsFromCluster(String clusterName,
-                                        String discoveryAddress,
-                                        String sinceTimestamp);
+  @Message(level = MessageLevel.DEBUG,
+          text = "Querying configuration activation events from {0} @ {1} since {2}")
+  void queryingConfigActivationEventsFromCluster(String clusterName,
+                                                 String discoveryAddress,
+                                                 String sinceTimestamp);
 
   @Message(level = MessageLevel.DEBUG, text = "Analyzing current {0} configuration for changes...")
   void analyzingCurrentServiceConfiguration(String serviceName);
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 4b7935f..2132d3f 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -48,6 +48,8 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,13 +68,20 @@ public class PollingConfigurationAnalyzer implements Runnable {
 
   private static final String COMMAND_STATUS = "COMMAND_STATUS";
 
-  private static final String STARTED_STATUS = "STARTED";
+  static final String SUCCEEDED_STATUS = "SUCCEEDED";
 
-  private static final String SUCCEEDED_STATUS = "SUCCEEDED";
+  static final String RESTART_COMMAND = "Restart";
 
-  private static final String RESTART_COMMAND = "Restart";
+  static final String START_COMMAND = "Start";
 
-  private static final String START_COMMAND = "Start";
+  static final String ROLLING_RESTART_COMMAND = "RollingRestart";
+
+  static final String CM_SERVICE_TYPE = "ManagerServer";
+  static final String CM_SERVICE      = "ClouderaManager";
+
+  // Collection of those commands which represent the potential activation of service configuration changes
+  private static final Collection<String> ACTIVATION_COMMANDS =
+                          Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND);
 
   // The format of the filter employed when start events are queried from ClouderaManager
   private static final String EVENTS_QUERY_FORMAT =
@@ -210,7 +219,8 @@ public class PollingConfigurationAnalyzer implements Runnable {
   private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) {
     // If there are start events, then check the previously-recorded properties for the same service to
     // identify if the configuration has changed
-    final Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName);
+    final Map<String, ServiceConfigurationModel> serviceConfigurations =
+                          configCache.getClusterServiceConfigurations(address, clusterName);
 
     // Those services for which a start even has been handled
     final List<String> handledServiceTypes = new ArrayList<>();
@@ -219,9 +229,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
     for (StartEvent re : relevantEvents) {
       String serviceType = re.getServiceType();
 
-      // Determine if we've already handled a start event for this service type
-      if (!handledServiceTypes.contains(serviceType)) {
+      if (CM_SERVICE_TYPE.equals(serviceType)) {
+        if (CM_SERVICE.equals(re.getService())) {
+          // This is a rolling cluster restart event, so assume configuration has changed
+          configHasChanged = true;
+        }
+      }
 
+      // Determine if we've already handled a start event for this service type
+      if (!configHasChanged && !handledServiceTypes.contains(serviceType)) {
         // Get the previously-recorded configuration
         ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
 
@@ -372,15 +388,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
       lastTimestamp = Instant.now().minus(eventQueryDefaultTimestampOffset, ChronoUnit.MILLIS).toString();
     }
 
-    log.queryingRestartEventsFromCluster(clusterName, address, lastTimestamp);
+    log.queryingConfigActivationEventsFromCluster(clusterName, address, lastTimestamp);
 
     // Record the new event query timestamp for this address/cluster
     setEventQueryTimestamp(address, clusterName, Instant.now());
 
     // Query the event log from CM for service/cluster start events
     List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
-                                               clusterName,
-                                               lastTimestamp);
+                                        clusterName,
+                                        lastTimestamp);
     for (ApiEvent event : events) {
       if(isRelevantEvent(event)) {
         relevantEvents.add(new StartEvent(event));
@@ -393,12 +409,11 @@ public class PollingConfigurationAnalyzer implements Runnable {
   @SuppressWarnings("unchecked")
   private boolean isRelevantEvent(ApiEvent event) {
     final Map<String, Object> attributeMap = getAttributeMap(event.getAttributes());
-    final String command = attributeMap.containsKey(COMMAND) ? (String) ((List<String>) attributeMap.get(COMMAND)).get(0) : "";
-    final String status = attributeMap.containsKey(COMMAND_STATUS) ? (String) ((List<String>) attributeMap.get(COMMAND_STATUS)).get(0) : "";
-    if ((START_COMMAND.equals(command) || RESTART_COMMAND.equals(command)) && (SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status))) {
-      return true;
-    }
-    return false;
+    final String command =
+            attributeMap.containsKey(COMMAND) ? ((List<String>) attributeMap.get(COMMAND)).get(0) : "";
+    final String status =
+            attributeMap.containsKey(COMMAND_STATUS) ? ((List<String>) attributeMap.get(COMMAND_STATUS)).get(0) : "";
+    return (ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status));
   }
 
   private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) {
@@ -538,9 +553,9 @@ public class PollingConfigurationAnalyzer implements Runnable {
    */
   static final class StartEvent {
 
-    private static final String ATTR_CLUSTER = "CLUSTER";
+    private static final String ATTR_CLUSTER      = "CLUSTER";
     private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
-    private static final String ATTR_SERVICE = "SERVICE";
+    private static final String ATTR_SERVICE      = "SERVICE";
 
     private static List<String> attrsOfInterest = new ArrayList<>();
 
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 49e339c..00f83a3 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,7 +27,6 @@ import org.apache.knox.gateway.services.ServiceType;
 import org.apache.knox.gateway.services.topology.TopologyService;
 import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
-import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator;
 import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
 import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
 import org.easymock.EasyMock;
@@ -50,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
 import static org.easymock.EasyMock.getCurrentArguments;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -58,148 +58,183 @@ public class PollingConfigurationAnalyzerTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testRestartEventWithWrongApiEventCategory() {
-    doTestRestartEvent(ApiEventCategory.LOG_EVENT);
+    doTestStartEvent(ApiEventCategory.LOG_EVENT);
   }
 
   @Test
-  public void testRestartEvent() {
-    doTestRestartEvent(ApiEventCategory.AUDIT_EVENT);
+  public void testStartEvent() {
+    doTestStartEvent(ApiEventCategory.AUDIT_EVENT);
   }
 
-  private void doTestRestartEvent(final ApiEventCategory category) {
-    final String clusterName = "My Cluster";
-    final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
-    final String service     = NameNodeServiceModelGenerator.SERVICE;
+  /**
+   * KNOX-2350
+   */
+  @Test
+  public void testEventWithoutCommandOrCommandStatus() {
+    final String clusterName = "Cluster T";
 
-    List<ApiEventAttribute> apiEventAttrs = new ArrayList<>();
-    apiEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
-    apiEventAttrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
-    apiEventAttrs.add(createEventAttribute("SERVICE", service));
-    ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
+    // Simulate an event w/o COMMAND and/or COMMAND_STATUS attributes
+    final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+    revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+    revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
+    revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+    revisionEventAttrs.add(createEventAttribute("EVENTCODE", "EV_REVISION_CREATED"));
+    final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs);
 
-    PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
-    assertNotNull(restartEvent);
-    assertEquals(clusterName, restartEvent.getClusterName());
-    assertEquals(serviceType, restartEvent.getServiceType());
-    assertEquals(service, restartEvent.getService());
-    assertNotNull(restartEvent.getTimestamp());
+    doTestEventWithoutConfigChange(revisionEvent, clusterName);
   }
 
+  /**
+   * Test the restart of an existing service when no relevant configuration has changed.
+   */
   @Test
-  public void testPollingConfigChangeNotificationForChangedPropertyValue() {
-    final String address = "http://host1:1234";
-    final String clusterName = "Cluster 5";
+  public void testRestartEventWithoutConfigChange() {
+    final String clusterName = "Cluster 2";
 
-    final String failoverPropertyName = "autofailover_enabled";
-    final String nsPropertyName = "dfs_federation_namenode_nameservice";
-    final String portPropertyName = "namenode_port";
+    // Simulate a service restart event
+    ApiEvent restartEvent = createApiEvent(clusterName,
+                                           NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                           NameNodeServiceModelGenerator.SERVICE,
+                                           PollingConfigurationAnalyzer.RESTART_COMMAND,
+                                           PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
 
-    // Mock the service discovery details
-    ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
-    EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
-    EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
-    EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
-    EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
-    EasyMock.replay(sdc);
+    doTestEventWithoutConfigChange(restartEvent, clusterName);
+  }
 
-    final Map<String, List<String>> clusterNames = new HashMap<>();
-    clusterNames.put(address, Collections.singletonList(clusterName));
+  /**
+   * Test the restart of an existing service when relevant configuration has changed.
+   */
+  @Test
+  public void testRestartEventWithConfigChange() {
+    final String clusterName = "Cluster 2";
 
-    // Create the original ServiceConfigurationModel details
-    final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
-    final Map<String, String> nnServiceConf = new HashMap<>();
-    final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
-    final Map<String, String> nnRoleProps = new HashMap<>();
-    nnRoleProps.put(failoverPropertyName, "false");
-    nnRoleProps.put(nsPropertyName, "");
-    nnRoleProps.put(portPropertyName, "54321");
-    nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
-    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
+    // Simulate a service restart event
+    ApiEvent restartEvent = createApiEvent(clusterName,
+                                           NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                           NameNodeServiceModelGenerator.SERVICE,
+                                           PollingConfigurationAnalyzer.RESTART_COMMAND,
+                                           PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
 
-    // Mock a ClusterConfigurationCache for the monitor to use
-    ClusterConfigurationCache configCache = EasyMock.createNiceMock(ClusterConfigurationCache.class);
-    EasyMock.expect(configCache.getDiscoveryConfig(address, clusterName)).andReturn(sdc).anyTimes();
-    EasyMock.expect(configCache.getClusterNames()).andReturn(clusterNames).anyTimes();
-    EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName))
-            .andReturn(serviceConfigurationModels)
-            .anyTimes();
-    EasyMock.replay(configCache);
+    doTestEventWithConfigChange(restartEvent, clusterName);
+  }
 
-    // Create the monitor, registering a listener so we can verify that change notification works
-    ChangeListener listener = new ChangeListener();
-    TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache, listener);
-    pca.setInterval(5);
+  /**
+   * Test the start of a new service.
+   */
+  @Test
+  public void testNewServiceStartEvent() {
+    final String address = "http://host1:1234";
+    final String clusterName = "Cluster N";
 
-    // Create another version of the same ServiceConfigurationModel with a modified property value
-    ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
-    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, "false");
-    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, "");
-    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, "12345");
-    pca.addCurrentServiceConfigModel(address, clusterName, NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", updatedNNModel);
+    // Simulate a service Start event
+    ApiEvent startEvent = createApiEvent(clusterName,
+                                         NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                         NameNodeServiceModelGenerator.SERVICE,
+                                         PollingConfigurationAnalyzer.START_COMMAND,
+                                         PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
+
+    ChangeListener listener =
+            doTestEvent(startEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap());
+    assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+  }
 
-    // Start the polling thread
-    ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
-    pollingThreadExecutor.execute(pca);
-    pollingThreadExecutor.shutdown();
+  /**
+   * Test the start of an existing service when no relevant configuration has changed.
+   */
+  @Test
+  public void testExistingServiceStartWithoutConfigChange() {
+    final String clusterName = "Cluster E";
 
-    // Simulate a service restart event
-    List<ApiEventAttribute> restartEventAttrs = new ArrayList<>();
-    restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
-    restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE));
-    restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE));
-    restartEventAttrs.add(createEventAttribute("COMMAND", "Restart"));
-    restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED"));
-    ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs);
-    pca.addRestartEvent(clusterName, restartEvent);
+    // Simulate a service Start event
+    ApiEvent startEvent = createApiEvent(clusterName,
+                                         NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                         NameNodeServiceModelGenerator.SERVICE,
+                                         PollingConfigurationAnalyzer.START_COMMAND,
+                                         PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
+
+    doTestEventWithoutConfigChange(startEvent, clusterName);
+  }
+
+  /**
+   * Test the start of an existing service when relevant configuration has changed.
+   */
+  @Test
+  public void testExistingServiceStartWithConfigChange() {
+    final String clusterName = "Cluster E";
 
     // Simulate a service Start event
-    List<ApiEventAttribute> startEventAttrs = new ArrayList<>();
-    startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
-    startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE));
-    startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE));
-    startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
-    startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED"));
-    ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
-    pca.addRestartEvent(clusterName, startEvent);
-
-    // Simulate a failed service Start event
-    startEventAttrs = new ArrayList<>();
-    startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
-    startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
-    startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
-    startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
-    startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED"));
-    ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
-    pca.addRestartEvent(clusterName, failedStartEvent);
+    ApiEvent startEvent = createApiEvent(clusterName,
+                                         NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                         NameNodeServiceModelGenerator.SERVICE,
+                                         PollingConfigurationAnalyzer.START_COMMAND,
+                                         PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
 
-    // Simulate an event w/o COMMAND and/or COMMAND_STATUS attributes
-    final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
-    revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
-    revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
-    revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
-    revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
-    revisionEventAttrs.add(createEventAttribute("EVENTCODE", "EV_REVISION_CREATED"));
-    final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs);
-    pca.addRestartEvent(clusterName, revisionEvent);
+    doTestEventWithConfigChange(startEvent, clusterName);
+  }
 
-    try {
-      pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      //
-    }
+  /**
+   * Test the rolling restart of an existing service when no relevant configuration has changed.
+   */
+  @Test
+  public void testRollingServiceRestartWithoutConfigChange() {
+    final String clusterName = "Cluster 1";
+
+    // Simulate a successful rolling service restart event
+    ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+                                                  NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                                  NameNodeServiceModelGenerator.SERVICE,
+                                                  PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+                                                  PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+                                                  "EV_SERVICE_ROLLING_RESTARTED");
+
+    doTestEventWithoutConfigChange(rollingRestartEvent, clusterName);
+  }
 
-    // Stop the config analyzer thread
-    pca.stop();
+  /**
+   * Test the rolling restart of an existing service when relevant configuration has changed.
+   */
+  @Test
+  public void testRollingServiceRestartWithConfigChange() {
+    final String clusterName = "Cluster 1";
+
+    // Simulate a successful rolling service restart event
+    ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+                                                  NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                                  NameNodeServiceModelGenerator.SERVICE,
+                                                  PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+                                                  PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+                                                  "EV_SERVICE_ROLLING_RESTARTED");
+
+    doTestEventWithConfigChange(rollingRestartEvent, clusterName);
+  }
 
+  /**
+   * Test the rolling restart of an entire cluster, for which it should be assumed that configuration has changed.
+   */
+  @Test
+  public void testRollingClusterRestartEvent() {
+    final String address = "http://host1:1234";
+    final String clusterName = "Cluster 6";
+
+    // Simulate a successful rolling cluster restart event
+    ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+                                                  PollingConfigurationAnalyzer.CM_SERVICE_TYPE,
+                                                  PollingConfigurationAnalyzer.CM_SERVICE,
+                                                  PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+                                                  PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+                                                  "EV_CLUSTER_ROLLING_RESTARTED");
+
+    ChangeListener listener =
+            doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap());
     assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
-    assertEquals(2, listener.howManyNotifications(address, clusterName));
   }
 
 
   @Test
   public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
     final String address = "http://host1:1234";
-    final String clusterName = "Cluster 5";
+    final String clusterName = "Cluster 7";
 
     final String updatedAddress = "http://host2:1234";
     final String descContent =
@@ -231,15 +266,13 @@ public class PollingConfigurationAnalyzerTest {
     EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
     EasyMock.replay(sdc);
 
-    final Map<String, List<String>> clusterNames = new HashMap<>();
-    clusterNames.put(address, Collections.singletonList(clusterName));
-
     // Create the original ServiceConfigurationModel details
     final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
     final Map<String, String> nnServiceConf = new HashMap<>();
     final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
     nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, Collections.emptyMap());
-    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
+    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                   createModel(nnServiceConf, nnRoleConf));
 
     // Create a ClusterConfigurationCache for the monitor to use
     final ClusterConfigurationCache configCache = new ClusterConfigurationCache();
@@ -303,6 +336,145 @@ public class PollingConfigurationAnalyzerTest {
     }
   }
 
+  private void doTestStartEvent(final ApiEventCategory category) {
+    final String clusterName = "My Cluster";
+    final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
+    final String service     = NameNodeServiceModelGenerator.SERVICE;
+
+    List<ApiEventAttribute> apiEventAttrs = new ArrayList<>();
+    apiEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    apiEventAttrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
+    apiEventAttrs.add(createEventAttribute("SERVICE", service));
+    ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
+
+    PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
+    assertNotNull(restartEvent);
+    assertEquals(clusterName, restartEvent.getClusterName());
+    assertEquals(serviceType, restartEvent.getServiceType());
+    assertEquals(service, restartEvent.getService());
+    assertNotNull(restartEvent.getTimestamp());
+  }
+
+  private ChangeListener doTestEvent(final ApiEvent                               event,
+                                     final String                                 address,
+                                     final String                                 clusterName,
+                                     final Map<String, ServiceConfigurationModel> serviceConfigurationModels,
+                                     final Map<String, ServiceConfigurationModel> updatedServiceConfigurationModels) {
+    // Mock the service discovery details
+    ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+    EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
+    EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+    EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
+    EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
+    EasyMock.replay(sdc);
+
+    final Map<String, List<String>> clusterNames = new HashMap<>();
+    clusterNames.put(address, Collections.singletonList(clusterName));
+
+    // Mock a ClusterConfigurationCache for the monitor to use
+    ClusterConfigurationCache configCache = EasyMock.createNiceMock(ClusterConfigurationCache.class);
+    EasyMock.expect(configCache.getDiscoveryConfig(address, clusterName)).andReturn(sdc).anyTimes();
+    EasyMock.expect(configCache.getClusterNames()).andReturn(clusterNames).anyTimes();
+    EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName))
+            .andReturn(serviceConfigurationModels)
+            .anyTimes();
+    EasyMock.replay(configCache);
+
+    // Create the monitor, registering a listener so we can verify that change notification works
+    ChangeListener listener = new ChangeListener();
+    TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache, listener);
+    pca.setInterval(5);
+
+    // Add updated service config models
+    for (String roleType : updatedServiceConfigurationModels.keySet()) {
+      pca.addCurrentServiceConfigModel(address, clusterName, roleType, updatedServiceConfigurationModels.get(roleType));
+    }
+
+    // Start the polling thread
+    ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
+    pollingThreadExecutor.execute(pca);
+    pollingThreadExecutor.shutdown();
+
+    pca.addRestartEvent(clusterName, event);
+
+    try {
+      pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      //
+    }
+
+    // Stop the config analyzer thread
+    pca.stop();
+
+    return listener;
+  }
+
+  private void doTestEventWithConfigChange(final ApiEvent event, final String clusterName) {
+    final String address = "http://host1:1234";
+
+    final String failoverPropertyName = "autofailover_enabled";
+    final String nsPropertyName = "dfs_federation_namenode_nameservice";
+    final String portPropertyName = "namenode_port";
+
+    // Create the original ServiceConfigurationModel details
+    final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
+    final Map<String, String> nnServiceConf = new HashMap<>();
+    final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+    final Map<String, String> nnRoleProps = new HashMap<>();
+    nnRoleProps.put(failoverPropertyName, "false");
+    nnRoleProps.put(nsPropertyName, "");
+    nnRoleProps.put(portPropertyName, "54321");
+    nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
+    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                   createModel(nnServiceConf, nnRoleConf));
+
+    // Create another version of the same ServiceConfigurationModel with a modified property value
+    ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, "false");
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, "");
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, "12345");
+    Map<String, ServiceConfigurationModel> updatedModels = new HashMap<>();
+    updatedModels.put(NameNodeServiceModelGenerator.ROLE_TYPE, updatedNNModel);
+
+    ChangeListener listener = doTestEvent(event, address, clusterName, serviceConfigurationModels, updatedModels);
+    assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+  }
+
+  private void doTestEventWithoutConfigChange(final ApiEvent event, final String clusterName) {
+    final String address = "http://host1:1234";
+
+    final String failoverPropertyName = "autofailover_enabled";
+    final String nsPropertyName = "dfs_federation_namenode_nameservice";
+    final String portPropertyName = "namenode_port";
+
+    final String failoverEnabledValue = "false";
+    final String nsValue = "";
+    final String portValue = "54321";
+
+    // Create the original ServiceConfigurationModel details
+    final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
+    final Map<String, String> nnServiceConf = new HashMap<>();
+    final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+    final Map<String, String> nnRoleProps = new HashMap<>();
+    nnRoleProps.put(failoverPropertyName, failoverEnabledValue);
+    nnRoleProps.put(nsPropertyName, nsValue);
+    nnRoleProps.put(portPropertyName, portValue);
+    nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
+    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+                                   createModel(nnServiceConf, nnRoleConf));
+
+    // Create another version of the same ServiceConfigurationModel with unmodified property values
+    ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, failoverEnabledValue);
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, nsValue);
+    updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, portValue);
+    Map<String, ServiceConfigurationModel> updatedModels = new HashMap<>();
+    updatedModels.put(NameNodeServiceModelGenerator.ROLE_TYPE, updatedNNModel);
+
+    ChangeListener listener = doTestEvent(event, address, clusterName, serviceConfigurationModels, updatedModels);
+    assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName));
+  }
+
   /**
    * Set the static GatewayServices field to the specified value.
    *
@@ -318,6 +490,31 @@ public class PollingConfigurationAnalyzerTest {
     }
   }
 
+  private ApiEvent createApiEvent(final String clusterName,
+                                  final String serviceType,
+                                  final String service,
+                                  final String command,
+                                  final String commandStatues) {
+
+    return createApiEvent(clusterName, serviceType, service, command, commandStatues, "");
+  }
+
+  private ApiEvent createApiEvent(final String clusterName,
+                                  final String serviceType,
+                                  final String service,
+                                  final String command,
+                                  final String commandStatues,
+                                  final String eventCode) {
+    List<ApiEventAttribute> attrs = new ArrayList<>();
+    attrs.add(createEventAttribute("CLUSTER", clusterName));
+    attrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
+    attrs.add(createEventAttribute("SERVICE", service));
+    attrs.add(createEventAttribute("COMMAND", command));
+    attrs.add(createEventAttribute("COMMAND_STATUS", commandStatues));
+    attrs.add(createEventAttribute("EVENTCODE", eventCode));
+    return createApiEvent(ApiEventCategory.AUDIT_EVENT, attrs);
+  }
+
   private ApiEvent createApiEvent(final ApiEventCategory category, final List<ApiEventAttribute> attrs) {
     ApiEvent event = EasyMock.createNiceMock(ApiEvent.class);
     EasyMock.expect(event.getTimeOccurred()).andReturn(Instant.now().toString()).anyTimes();
@@ -359,8 +556,8 @@ public class PollingConfigurationAnalyzerTest {
    */
   private static class TestablePollingConfigAnalyzer extends PollingConfigurationAnalyzer {
 
-    private Map<String, List<ApiEvent>> restartEvents = new HashMap<>();
-    private Map<String, ServiceConfigurationModel> serviceConfigModels = new HashMap<>();
+    private final Map<String, List<ApiEvent>> restartEvents = new HashMap<>();
+    private final Map<String, ServiceConfigurationModel> serviceConfigModels = new HashMap<>();
 
     TestablePollingConfigAnalyzer(ClusterConfigurationCache cache) {
       this(cache, null);
@@ -371,12 +568,6 @@ public class PollingConfigurationAnalyzerTest {
       super(cache, null, null, listener);
     }
 
-    TestablePollingConfigAnalyzer(ClusterConfigurationCache cache,
-                                  ConfigurationChangeListener listener,
-                                  int interval) {
-      super(cache, null, null, listener, interval);
-    }
-
     void addRestartEvent(final String service, final ApiEvent restartEvent) {
       restartEvents.computeIfAbsent(service, l -> new ArrayList<>()).add(restartEvent);
     }