You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by pz...@apache.org on 2020/04/30 21:09:00 UTC
[knox] branch master updated: KNOX-2368 - CM Cluster Configuration
Monitor Does Not Support Rolling Restart Events
This is an automated email from the ASF dual-hosted git repository.
pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 57184ab KNOX-2368 - CM Cluster Configuration Monitor Does Not Support Rolling Restart Events
57184ab is described below
commit 57184ab02ffe8020f9b91f64b73d73e1d7f38131
Author: Phil Zampino <pz...@apache.org>
AuthorDate: Thu Apr 30 17:08:51 2020 -0400
KNOX-2368 - CM Cluster Configuration Monitor Does Not Support Rolling Restart Events
---
.../ClouderaManagerServiceDiscoveryMessages.java | 9 +-
.../cm/monitor/PollingConfigurationAnalyzer.java | 51 ++-
.../monitor/PollingConfigurationAnalyzerTest.java | 437 +++++++++++++++------
3 files changed, 352 insertions(+), 145 deletions(-)
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 7f7c644..d04c777 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -162,10 +162,11 @@ public interface ClouderaManagerServiceDiscoveryMessages {
String clusterName,
String discoveryAddress);
- @Message(level = MessageLevel.DEBUG, text = "Querying restart events from {0} @ {1} since {2}")
- void queryingRestartEventsFromCluster(String clusterName,
- String discoveryAddress,
- String sinceTimestamp);
+ @Message(level = MessageLevel.DEBUG,
+ text = "Querying configuration activation events from {0} @ {1} since {2}")
+ void queryingConfigActivationEventsFromCluster(String clusterName,
+ String discoveryAddress,
+ String sinceTimestamp);
@Message(level = MessageLevel.DEBUG, text = "Analyzing current {0} configuration for changes...")
void analyzingCurrentServiceConfiguration(String serviceName);
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 4b7935f..2132d3f 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -48,6 +48,8 @@ import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -66,13 +68,20 @@ public class PollingConfigurationAnalyzer implements Runnable {
private static final String COMMAND_STATUS = "COMMAND_STATUS";
- private static final String STARTED_STATUS = "STARTED";
+ static final String SUCCEEDED_STATUS = "SUCCEEDED";
- private static final String SUCCEEDED_STATUS = "SUCCEEDED";
+ static final String RESTART_COMMAND = "Restart";
- private static final String RESTART_COMMAND = "Restart";
+ static final String START_COMMAND = "Start";
- private static final String START_COMMAND = "Start";
+ static final String ROLLING_RESTART_COMMAND = "RollingRestart";
+
+ static final String CM_SERVICE_TYPE = "ManagerServer";
+ static final String CM_SERVICE = "ClouderaManager";
+
+ // Collection of those commands which represent the potential activation of service configuration changes
+ private static final Collection<String> ACTIVATION_COMMANDS =
+ Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND);
// The format of the filter employed when start events are queried from ClouderaManager
private static final String EVENTS_QUERY_FORMAT =
@@ -210,7 +219,8 @@ public class PollingConfigurationAnalyzer implements Runnable {
private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) {
// If there are start events, then check the previously-recorded properties for the same service to
// identify if the configuration has changed
- final Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName);
+ final Map<String, ServiceConfigurationModel> serviceConfigurations =
+ configCache.getClusterServiceConfigurations(address, clusterName);
// Those services for which a start even has been handled
final List<String> handledServiceTypes = new ArrayList<>();
@@ -219,9 +229,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
for (StartEvent re : relevantEvents) {
String serviceType = re.getServiceType();
- // Determine if we've already handled a start event for this service type
- if (!handledServiceTypes.contains(serviceType)) {
+ if (CM_SERVICE_TYPE.equals(serviceType)) {
+ if (CM_SERVICE.equals(re.getService())) {
+ // This is a rolling cluster restart event, so assume configuration has changed
+ configHasChanged = true;
+ }
+ }
+ // Determine if we've already handled a start event for this service type
+ if (!configHasChanged && !handledServiceTypes.contains(serviceType)) {
// Get the previously-recorded configuration
ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
@@ -372,15 +388,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
lastTimestamp = Instant.now().minus(eventQueryDefaultTimestampOffset, ChronoUnit.MILLIS).toString();
}
- log.queryingRestartEventsFromCluster(clusterName, address, lastTimestamp);
+ log.queryingConfigActivationEventsFromCluster(clusterName, address, lastTimestamp);
// Record the new event query timestamp for this address/cluster
setEventQueryTimestamp(address, clusterName, Instant.now());
// Query the event log from CM for service/cluster start events
List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
- clusterName,
- lastTimestamp);
+ clusterName,
+ lastTimestamp);
for (ApiEvent event : events) {
if(isRelevantEvent(event)) {
relevantEvents.add(new StartEvent(event));
@@ -393,12 +409,11 @@ public class PollingConfigurationAnalyzer implements Runnable {
@SuppressWarnings("unchecked")
private boolean isRelevantEvent(ApiEvent event) {
final Map<String, Object> attributeMap = getAttributeMap(event.getAttributes());
- final String command = attributeMap.containsKey(COMMAND) ? (String) ((List<String>) attributeMap.get(COMMAND)).get(0) : "";
- final String status = attributeMap.containsKey(COMMAND_STATUS) ? (String) ((List<String>) attributeMap.get(COMMAND_STATUS)).get(0) : "";
- if ((START_COMMAND.equals(command) || RESTART_COMMAND.equals(command)) && (SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status))) {
- return true;
- }
- return false;
+ final String command =
+ attributeMap.containsKey(COMMAND) ? ((List<String>) attributeMap.get(COMMAND)).get(0) : "";
+ final String status =
+ attributeMap.containsKey(COMMAND_STATUS) ? ((List<String>) attributeMap.get(COMMAND_STATUS)).get(0) : "";
+ return (ACTIVATION_COMMANDS.contains(command) && SUCCEEDED_STATUS.equals(status));
}
private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) {
@@ -538,9 +553,9 @@ public class PollingConfigurationAnalyzer implements Runnable {
*/
static final class StartEvent {
- private static final String ATTR_CLUSTER = "CLUSTER";
+ private static final String ATTR_CLUSTER = "CLUSTER";
private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
- private static final String ATTR_SERVICE = "SERVICE";
+ private static final String ATTR_SERVICE = "SERVICE";
private static List<String> attrsOfInterest = new ArrayList<>();
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 49e339c..00f83a3 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,7 +27,6 @@ import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
-import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
import org.easymock.EasyMock;
@@ -50,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -58,148 +58,183 @@ public class PollingConfigurationAnalyzerTest {
@Test(expected = IllegalArgumentException.class)
public void testRestartEventWithWrongApiEventCategory() {
- doTestRestartEvent(ApiEventCategory.LOG_EVENT);
+ doTestStartEvent(ApiEventCategory.LOG_EVENT);
}
@Test
- public void testRestartEvent() {
- doTestRestartEvent(ApiEventCategory.AUDIT_EVENT);
+ public void testStartEvent() {
+ doTestStartEvent(ApiEventCategory.AUDIT_EVENT);
}
- private void doTestRestartEvent(final ApiEventCategory category) {
- final String clusterName = "My Cluster";
- final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
- final String service = NameNodeServiceModelGenerator.SERVICE;
+ /**
+ * KNOX-2350
+ */
+ @Test
+ public void testEventWithoutCommandOrCommandStatus() {
+ final String clusterName = "Cluster T";
- List<ApiEventAttribute> apiEventAttrs = new ArrayList<>();
- apiEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
- apiEventAttrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
- apiEventAttrs.add(createEventAttribute("SERVICE", service));
- ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
+ // Simulate an event w/o COMMAND and/or COMMAND_STATUS attributes
+ final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+ revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+ revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
+ revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+ revisionEventAttrs.add(createEventAttribute("EVENTCODE", "EV_REVISION_CREATED"));
+ final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs);
- PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
- assertNotNull(restartEvent);
- assertEquals(clusterName, restartEvent.getClusterName());
- assertEquals(serviceType, restartEvent.getServiceType());
- assertEquals(service, restartEvent.getService());
- assertNotNull(restartEvent.getTimestamp());
+ doTestEventWithoutConfigChange(revisionEvent, clusterName);
}
+ /**
+ * Test the restart of an existing service when no relevant configuration has changed.
+ */
@Test
- public void testPollingConfigChangeNotificationForChangedPropertyValue() {
- final String address = "http://host1:1234";
- final String clusterName = "Cluster 5";
+ public void testRestartEventWithoutConfigChange() {
+ final String clusterName = "Cluster 2";
- final String failoverPropertyName = "autofailover_enabled";
- final String nsPropertyName = "dfs_federation_namenode_nameservice";
- final String portPropertyName = "namenode_port";
+ // Simulate a service restart event
+ ApiEvent restartEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.RESTART_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
- // Mock the service discovery details
- ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
- EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
- EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
- EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
- EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
- EasyMock.replay(sdc);
+ doTestEventWithoutConfigChange(restartEvent, clusterName);
+ }
- final Map<String, List<String>> clusterNames = new HashMap<>();
- clusterNames.put(address, Collections.singletonList(clusterName));
+ /**
+ * Test the restart of an existing service when relevant configuration has changed.
+ */
+ @Test
+ public void testRestartEventWithConfigChange() {
+ final String clusterName = "Cluster 2";
- // Create the original ServiceConfigurationModel details
- final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
- final Map<String, String> nnServiceConf = new HashMap<>();
- final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
- final Map<String, String> nnRoleProps = new HashMap<>();
- nnRoleProps.put(failoverPropertyName, "false");
- nnRoleProps.put(nsPropertyName, "");
- nnRoleProps.put(portPropertyName, "54321");
- nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
- serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
+ // Simulate a service restart event
+ ApiEvent restartEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.RESTART_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
- // Mock a ClusterConfigurationCache for the monitor to use
- ClusterConfigurationCache configCache = EasyMock.createNiceMock(ClusterConfigurationCache.class);
- EasyMock.expect(configCache.getDiscoveryConfig(address, clusterName)).andReturn(sdc).anyTimes();
- EasyMock.expect(configCache.getClusterNames()).andReturn(clusterNames).anyTimes();
- EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName))
- .andReturn(serviceConfigurationModels)
- .anyTimes();
- EasyMock.replay(configCache);
+ doTestEventWithConfigChange(restartEvent, clusterName);
+ }
- // Create the monitor, registering a listener so we can verify that change notification works
- ChangeListener listener = new ChangeListener();
- TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache, listener);
- pca.setInterval(5);
+ /**
+ * Test the start of a new service.
+ */
+ @Test
+ public void testNewServiceStartEvent() {
+ final String address = "http://host1:1234";
+ final String clusterName = "Cluster N";
- // Create another version of the same ServiceConfigurationModel with a modified property value
- ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
- updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, "false");
- updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, "");
- updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, "12345");
- pca.addCurrentServiceConfigModel(address, clusterName, NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", updatedNNModel);
+ // Simulate a service Start event
+ ApiEvent startEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.START_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
+
+ ChangeListener listener =
+ doTestEvent(startEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap());
+ assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+ }
- // Start the polling thread
- ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
- pollingThreadExecutor.execute(pca);
- pollingThreadExecutor.shutdown();
+ /**
+ * Test the start of an existing service when no relevant configuration has changed.
+ */
+ @Test
+ public void testExistingServiceStartWithoutConfigChange() {
+ final String clusterName = "Cluster E";
- // Simulate a service restart event
- List<ApiEventAttribute> restartEventAttrs = new ArrayList<>();
- restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
- restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE));
- restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE));
- restartEventAttrs.add(createEventAttribute("COMMAND", "Restart"));
- restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED"));
- ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs);
- pca.addRestartEvent(clusterName, restartEvent);
+ // Simulate a service Start event
+ ApiEvent startEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.START_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
+
+ doTestEventWithoutConfigChange(startEvent, clusterName);
+ }
+
+ /**
+ * Test the start of an existing service when relevant configuration has changed.
+ */
+ @Test
+ public void testExistingServiceStartWithConfigChange() {
+ final String clusterName = "Cluster E";
// Simulate a service Start event
- List<ApiEventAttribute> startEventAttrs = new ArrayList<>();
- startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
- startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE));
- startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE));
- startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
- startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED"));
- ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
- pca.addRestartEvent(clusterName, startEvent);
-
- // Simulate a failed service Start event
- startEventAttrs = new ArrayList<>();
- startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
- startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
- startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
- startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
- startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED"));
- ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
- pca.addRestartEvent(clusterName, failedStartEvent);
+ ApiEvent startEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.START_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS);
- // Simulate an event w/o COMMAND and/or COMMAND_STATUS attributes
- final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
- revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
- revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
- revisionEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
- revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
- revisionEventAttrs.add(createEventAttribute("EVENTCODE", "EV_REVISION_CREATED"));
- final ApiEvent revisionEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs);
- pca.addRestartEvent(clusterName, revisionEvent);
+ doTestEventWithConfigChange(startEvent, clusterName);
+ }
- try {
- pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- //
- }
+ /**
+ * Test the rolling restart of an existing service when no relevant configuration has changed.
+ */
+ @Test
+ public void testRollingServiceRestartWithoutConfigChange() {
+ final String clusterName = "Cluster 1";
+
+ // Simulate a successful rolling service restart event
+ ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+ "EV_SERVICE_ROLLING_RESTARTED");
+
+ doTestEventWithoutConfigChange(rollingRestartEvent, clusterName);
+ }
- // Stop the config analyzer thread
- pca.stop();
+ /**
+ * Test the rolling restart of an existing service when relevant configuration has changed.
+ */
+ @Test
+ public void testRollingServiceRestartWithConfigChange() {
+ final String clusterName = "Cluster 1";
+
+ // Simulate a successful rolling service restart event
+ ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+ NameNodeServiceModelGenerator.SERVICE_TYPE,
+ NameNodeServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+ "EV_SERVICE_ROLLING_RESTARTED");
+
+ doTestEventWithConfigChange(rollingRestartEvent, clusterName);
+ }
+ /**
+ * Test the rolling restart of an entire cluster, for which it should be assumed that configuration has changed.
+ */
+ @Test
+ public void testRollingClusterRestartEvent() {
+ final String address = "http://host1:1234";
+ final String clusterName = "Cluster 6";
+
+ // Simulate a successful rolling cluster restart event
+ ApiEvent rollingRestartEvent = createApiEvent(clusterName,
+ PollingConfigurationAnalyzer.CM_SERVICE_TYPE,
+ PollingConfigurationAnalyzer.CM_SERVICE,
+ PollingConfigurationAnalyzer.ROLLING_RESTART_COMMAND,
+ PollingConfigurationAnalyzer.SUCCEEDED_STATUS,
+ "EV_CLUSTER_ROLLING_RESTARTED");
+
+ ChangeListener listener =
+ doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap());
assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
- assertEquals(2, listener.howManyNotifications(address, clusterName));
}
@Test
public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
final String address = "http://host1:1234";
- final String clusterName = "Cluster 5";
+ final String clusterName = "Cluster 7";
final String updatedAddress = "http://host2:1234";
final String descContent =
@@ -231,15 +266,13 @@ public class PollingConfigurationAnalyzerTest {
EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
EasyMock.replay(sdc);
- final Map<String, List<String>> clusterNames = new HashMap<>();
- clusterNames.put(address, Collections.singletonList(clusterName));
-
// Create the original ServiceConfigurationModel details
final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
final Map<String, String> nnServiceConf = new HashMap<>();
final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, Collections.emptyMap());
- serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE + "-1", createModel(nnServiceConf, nnRoleConf));
+ serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+ createModel(nnServiceConf, nnRoleConf));
// Create a ClusterConfigurationCache for the monitor to use
final ClusterConfigurationCache configCache = new ClusterConfigurationCache();
@@ -303,6 +336,145 @@ public class PollingConfigurationAnalyzerTest {
}
}
+ private void doTestStartEvent(final ApiEventCategory category) {
+ final String clusterName = "My Cluster";
+ final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
+ final String service = NameNodeServiceModelGenerator.SERVICE;
+
+ List<ApiEventAttribute> apiEventAttrs = new ArrayList<>();
+ apiEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ apiEventAttrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
+ apiEventAttrs.add(createEventAttribute("SERVICE", service));
+ ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
+
+ PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
+ assertNotNull(restartEvent);
+ assertEquals(clusterName, restartEvent.getClusterName());
+ assertEquals(serviceType, restartEvent.getServiceType());
+ assertEquals(service, restartEvent.getService());
+ assertNotNull(restartEvent.getTimestamp());
+ }
+
+ private ChangeListener doTestEvent(final ApiEvent event,
+ final String address,
+ final String clusterName,
+ final Map<String, ServiceConfigurationModel> serviceConfigurationModels,
+ final Map<String, ServiceConfigurationModel> updatedServiceConfigurationModels) {
+ // Mock the service discovery details
+ ServiceDiscoveryConfig sdc = EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+ EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
+ EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+ EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
+ EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
+ EasyMock.replay(sdc);
+
+ final Map<String, List<String>> clusterNames = new HashMap<>();
+ clusterNames.put(address, Collections.singletonList(clusterName));
+
+ // Mock a ClusterConfigurationCache for the monitor to use
+ ClusterConfigurationCache configCache = EasyMock.createNiceMock(ClusterConfigurationCache.class);
+ EasyMock.expect(configCache.getDiscoveryConfig(address, clusterName)).andReturn(sdc).anyTimes();
+ EasyMock.expect(configCache.getClusterNames()).andReturn(clusterNames).anyTimes();
+ EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName))
+ .andReturn(serviceConfigurationModels)
+ .anyTimes();
+ EasyMock.replay(configCache);
+
+ // Create the monitor, registering a listener so we can verify that change notification works
+ ChangeListener listener = new ChangeListener();
+ TestablePollingConfigAnalyzer pca = new TestablePollingConfigAnalyzer(configCache, listener);
+ pca.setInterval(5);
+
+ // Add updated service config models
+ for (String roleType : updatedServiceConfigurationModels.keySet()) {
+ pca.addCurrentServiceConfigModel(address, clusterName, roleType, updatedServiceConfigurationModels.get(roleType));
+ }
+
+ // Start the polling thread
+ ExecutorService pollingThreadExecutor = Executors.newSingleThreadExecutor();
+ pollingThreadExecutor.execute(pca);
+ pollingThreadExecutor.shutdown();
+
+ pca.addRestartEvent(clusterName, event);
+
+ try {
+ pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ }
+
+ // Stop the config analyzer thread
+ pca.stop();
+
+ return listener;
+ }
+
+ private void doTestEventWithConfigChange(final ApiEvent event, final String clusterName) {
+ final String address = "http://host1:1234";
+
+ final String failoverPropertyName = "autofailover_enabled";
+ final String nsPropertyName = "dfs_federation_namenode_nameservice";
+ final String portPropertyName = "namenode_port";
+
+ // Create the original ServiceConfigurationModel details
+ final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
+ final Map<String, String> nnServiceConf = new HashMap<>();
+ final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+ final Map<String, String> nnRoleProps = new HashMap<>();
+ nnRoleProps.put(failoverPropertyName, "false");
+ nnRoleProps.put(nsPropertyName, "");
+ nnRoleProps.put(portPropertyName, "54321");
+ nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
+ serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+ createModel(nnServiceConf, nnRoleConf));
+
+ // Create another version of the same ServiceConfigurationModel with a modified property value
+ ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, "false");
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, "");
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, "12345");
+ Map<String, ServiceConfigurationModel> updatedModels = new HashMap<>();
+ updatedModels.put(NameNodeServiceModelGenerator.ROLE_TYPE, updatedNNModel);
+
+ ChangeListener listener = doTestEvent(event, address, clusterName, serviceConfigurationModels, updatedModels);
+ assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+ }
+
+ private void doTestEventWithoutConfigChange(final ApiEvent event, final String clusterName) {
+ final String address = "http://host1:1234";
+
+ final String failoverPropertyName = "autofailover_enabled";
+ final String nsPropertyName = "dfs_federation_namenode_nameservice";
+ final String portPropertyName = "namenode_port";
+
+ final String failoverEnabledValue = "false";
+ final String nsValue = "";
+ final String portValue = "54321";
+
+ // Create the original ServiceConfigurationModel details
+ final Map<String, ServiceConfigurationModel> serviceConfigurationModels = new HashMap<>();
+ final Map<String, String> nnServiceConf = new HashMap<>();
+ final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+ final Map<String, String> nnRoleProps = new HashMap<>();
+ nnRoleProps.put(failoverPropertyName, failoverEnabledValue);
+ nnRoleProps.put(nsPropertyName, nsValue);
+ nnRoleProps.put(portPropertyName, portValue);
+ nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, nnRoleProps);
+ serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE,
+ createModel(nnServiceConf, nnRoleConf));
+
+ // Create another version of the same ServiceConfigurationModel with unmodified property values
+ ServiceConfigurationModel updatedNNModel = new ServiceConfigurationModel();
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, failoverPropertyName, failoverEnabledValue);
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, nsPropertyName, nsValue);
+ updatedNNModel.addRoleProperty(NameNodeServiceModelGenerator.ROLE_TYPE, portPropertyName, portValue);
+ Map<String, ServiceConfigurationModel> updatedModels = new HashMap<>();
+ updatedModels.put(NameNodeServiceModelGenerator.ROLE_TYPE, updatedNNModel);
+
+ ChangeListener listener = doTestEvent(event, address, clusterName, serviceConfigurationModels, updatedModels);
+ assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName));
+ }
+
/**
* Set the static GatewayServices field to the specified value.
*
@@ -318,6 +490,31 @@ public class PollingConfigurationAnalyzerTest {
}
}
+ private ApiEvent createApiEvent(final String clusterName,
+ final String serviceType,
+ final String service,
+ final String command,
+ final String commandStatues) {
+
+ return createApiEvent(clusterName, serviceType, service, command, commandStatues, "");
+ }
+
+ private ApiEvent createApiEvent(final String clusterName,
+ final String serviceType,
+ final String service,
+ final String command,
+ final String commandStatues,
+ final String eventCode) {
+ List<ApiEventAttribute> attrs = new ArrayList<>();
+ attrs.add(createEventAttribute("CLUSTER", clusterName));
+ attrs.add(createEventAttribute("SERVICE_TYPE", serviceType));
+ attrs.add(createEventAttribute("SERVICE", service));
+ attrs.add(createEventAttribute("COMMAND", command));
+ attrs.add(createEventAttribute("COMMAND_STATUS", commandStatues));
+ attrs.add(createEventAttribute("EVENTCODE", eventCode));
+ return createApiEvent(ApiEventCategory.AUDIT_EVENT, attrs);
+ }
+
private ApiEvent createApiEvent(final ApiEventCategory category, final List<ApiEventAttribute> attrs) {
ApiEvent event = EasyMock.createNiceMock(ApiEvent.class);
EasyMock.expect(event.getTimeOccurred()).andReturn(Instant.now().toString()).anyTimes();
@@ -359,8 +556,8 @@ public class PollingConfigurationAnalyzerTest {
*/
private static class TestablePollingConfigAnalyzer extends PollingConfigurationAnalyzer {
- private Map<String, List<ApiEvent>> restartEvents = new HashMap<>();
- private Map<String, ServiceConfigurationModel> serviceConfigModels = new HashMap<>();
+ private final Map<String, List<ApiEvent>> restartEvents = new HashMap<>();
+ private final Map<String, ServiceConfigurationModel> serviceConfigModels = new HashMap<>();
TestablePollingConfigAnalyzer(ClusterConfigurationCache cache) {
this(cache, null);
@@ -371,12 +568,6 @@ public class PollingConfigurationAnalyzerTest {
super(cache, null, null, listener);
}
- TestablePollingConfigAnalyzer(ClusterConfigurationCache cache,
- ConfigurationChangeListener listener,
- int interval) {
- super(cache, null, null, listener, interval);
- }
-
void addRestartEvent(final String service, final ApiEvent restartEvent) {
restartEvents.computeIfAbsent(service, l -> new ArrayList<>()).add(restartEvent);
}