You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2020/04/08 18:38:55 UTC
[knox] branch master updated: KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)
This is an automated email from the ASF dual-hosted git repository.
lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 8920651 KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)
8920651 is described below
commit 89206511aad7cacb91c7fa032490816e7cdd18ba
Author: lmccay <lm...@apache.org>
AuthorDate: Wed Apr 8 14:38:46 2020 -0400
KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)
* KNOX-2304 - CM discovery cluster config monitor needs to be aware of all relevant CM event types
Change-Id: Ic26ad36fcb110e01d30d636f14d5b383de01ff17
* KNOX-2304 - address review comments
Change-Id: I5c2009f505b31c5c69ee7672ed37d21e1a7c48bb
---
.../cm/monitor/PollingConfigurationAnalyzer.java | 100 ++++++++++++++-------
.../monitor/PollingConfigurationAnalyzerTest.java | 37 +++++++-
2 files changed, 101 insertions(+), 36 deletions(-)
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index b9f163b..380962a 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -60,17 +60,27 @@ import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMon
@SuppressWarnings("PMD.DoNotUseThreads")
public class PollingConfigurationAnalyzer implements Runnable {
- // The format of the filter employed when restart events are queried from ClouderaManager
- private static final String RESTART_EVENTS_QUERY_FORMAT =
+ private static final String COMMAND = "COMMAND";
+
+ private static final String COMMAND_STATUS = "COMMAND_STATUS";
+
+ private static final String STARTED_STATUS = "STARTED";
+
+ private static final String SUCCEEDED_STATUS = "SUCCEEDED";
+
+ private static final String RESTART_COMMAND = "Restart";
+
+ private static final String START_COMMAND = "Start";
+
+ // The format of the filter employed when start events are queried from ClouderaManager
+ private static final String EVENTS_QUERY_FORMAT =
"category==" + ApiEventCategory.AUDIT_EVENT.getValue() +
- ";attributes.command==Restart" +
- ";attributes.command_status==SUCCEEDED" +
";attributes.cluster==\"%s\"%s";
- // The format of the timestamp element of the restart events query filter
+ // The format of the timestamp element of the start events query filter
private static final String EVENTS_QUERY_TIMESTAMP_FORMAT = ";timeOccurred=gt=%s";
- // The default amount of time before "now" to check for restart events the first time
+ // The default amount of time before "now" to check for start events the first time
private static final long DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET = (60 * 60 * 1000); // one hour
private static final int DEFAULT_POLLING_INTERVAL = 60;
@@ -100,10 +110,10 @@ public class PollingConfigurationAnalyzer implements Runnable {
// Cache of ClouderaManager API clients, keyed by discovery address
private final Map<String, DiscoveryApiClient> clients = new ConcurrentHashMap<>();
- // Timestamp records of the most recent restart event query per discovery address
+ // Timestamp records of the most recent start event query per discovery address
private Map<String, String> eventQueryTimestamps = new ConcurrentHashMap<>();
- // The amount of time before "now" to will check for restart events the first time
+ // The amount of time before "now" to will check for start events the first time
private long eventQueryDefaultTimestampOffset = DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET;
private boolean isActive;
@@ -163,34 +173,34 @@ public class PollingConfigurationAnalyzer implements Runnable {
continue;
}
- // Configuration changes don't mean anything without corresponding service restarts. Therefore, monitor
- // restart events, and check the configuration only of the restarted service(s) to identify changes
+ // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+ // start events, and check the configuration only of the restarted service(s) to identify changes
// that should trigger re-discovery.
- List<RestartEvent> restartEvents = getRestartEvents(address, clusterName);
+ List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
- // If there are no recent restart events, then nothing to do now
- if (!restartEvents.isEmpty()) {
+ // If there are no recent start events, then nothing to do now
+ if (!relevantEvents.isEmpty()) {
boolean configHasChanged = false;
- // If there are restart events, then check the previously-recorded properties for the same service to
+ // If there are start events, then check the previously-recorded properties for the same service to
// identify if the configuration has changed
Map<String, ServiceConfigurationModel> serviceConfigurations =
configCache.getClusterServiceConfigurations(address, clusterName);
- // Those services for which a restart even has been handled
+ // Those services for which a start even has been handled
List<String> handledServiceTypes = new ArrayList<>();
- for (RestartEvent re : restartEvents) {
+ for (StartEvent re : relevantEvents) {
String serviceType = re.getServiceType();
- // Determine if we've already handled a restart event for this service type
+ // Determine if we've already handled a start event for this service type
if (!handledServiceTypes.contains(serviceType)) {
// Get the previously-recorded configuration
ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
if (serviceConfig != null) {
- // Get the current config for the restarted service, and compare with the previously-recorded config
+ // Get the current config for the started service, and compare with the previously-recorded config
ServiceConfigurationModel currentConfig =
getCurrentServiceConfiguration(address, clusterName, re.getService());
@@ -337,15 +347,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
}
/**
- * Get restart events for the specified ClouderaManager cluster.
+ * Get relevant events for the specified ClouderaManager cluster.
*
* @param address The address of the ClouderaManager instance.
* @param clusterName The name of the cluster.
*
- * @return A List of RestartEvent objects for service restart events since the last time they were queried.
+ * @return A List of StartEvent objects for service start events since the last time they were queried.
*/
- private List<RestartEvent> getRestartEvents(final String address, final String clusterName) {
- List<RestartEvent> restartEvents = new ArrayList<>();
+ private List<StartEvent> getRelevantEvents(final String address, final String clusterName) {
+ List<StartEvent> relevantEvents = new ArrayList<>();
// Get the last event query timestamp
String lastTimestamp = getEventQueryTimestamp(address, clusterName);
@@ -360,19 +370,43 @@ public class PollingConfigurationAnalyzer implements Runnable {
// Record the new event query timestamp for this address/cluster
setEventQueryTimestamp(address, clusterName, Instant.now());
- // Query the event log from CM for service/cluster restart events
- List<ApiEvent> events = queryRestartEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
+ // Query the event log from CM for service/cluster start events
+ List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
clusterName,
lastTimestamp);
for (ApiEvent event : events) {
- restartEvents.add(new RestartEvent(event));
+ if(isRelevantEvent(event)) {
+ relevantEvents.add(new StartEvent(event));
+ }
+ }
+
+ return relevantEvents;
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean isRelevantEvent(ApiEvent event) {
+ boolean rc = false;
+ String command = null;
+ String status = null;
+ List<ApiEventAttribute> attributes = event.getAttributes();
+ Map<String,Object> map = getAttributeMap(attributes);
+ command = (String) ((List<String>) map.get(COMMAND)).get(0);
+ status = (String) ((List<String>) map.get(COMMAND_STATUS)).get(0);
+ if (START_COMMAND.equals(command) || RESTART_COMMAND.equals(command) &&
+ SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status)) {
+ rc = true;
}
+ return rc;
+ }
- return restartEvents;
+ private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) {
+ Map<String,Object> map = new HashMap<>();
+ attributes.forEach(attr -> { map.put(attr.getName(), attr.getValues());});
+ return map;
}
/**
- * Query the ClouderaManager instance associated with the specified client for any service restart events in the
+ * Query the ClouderaManager instance associated with the specified client for any service start events in the
* specified cluster since the specified time.
*
* @param client A ClouderaManager API client.
@@ -381,15 +415,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
*
* @return A List of ApiEvent objects representing the relevant events since the specified time.
*/
- protected List<ApiEvent> queryRestartEvents(final ApiClient client, final String clusterName, final String since) {
+ protected List<ApiEvent> queryEvents(final ApiClient client, final String clusterName, final String since) {
List<ApiEvent> events = new ArrayList<>();
- // Setup the query for restart events
+ // Setup the query for events
String timeFilter =
(since != null) ? String.format(Locale.ROOT, EVENTS_QUERY_TIMESTAMP_FORMAT, since) : "";
String queryString = String.format(Locale.ROOT,
- RESTART_EVENTS_QUERY_FORMAT,
+ EVENTS_QUERY_FORMAT,
clusterName,
timeFilter);
@@ -500,9 +534,9 @@ public class PollingConfigurationAnalyzer implements Runnable {
}
/**
- * Internal representation of a ClouderaManager service restart event
+ * Internal representation of a ClouderaManager service start event
*/
- static final class RestartEvent {
+ static final class StartEvent {
private static final String ATTR_CLUSTER = "CLUSTER";
private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
@@ -521,7 +555,7 @@ public class PollingConfigurationAnalyzer implements Runnable {
private String serviceType;
private String service;
- RestartEvent(final ApiEvent auditEvent) {
+ StartEvent(final ApiEvent auditEvent) {
if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue());
}
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index a817c83..cb2066e 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,7 +27,9 @@ import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
+import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
+import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -75,7 +77,7 @@ public class PollingConfigurationAnalyzerTest {
apiEventAttrs.add(createEventAttribute("SERVICE", service));
ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
- PollingConfigurationAnalyzer.RestartEvent restartEvent = new PollingConfigurationAnalyzer.RestartEvent(apiEvent);
+ PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
assertNotNull(restartEvent);
assertEquals(clusterName, restartEvent.getClusterName());
assertEquals(serviceType, restartEvent.getServiceType());
@@ -145,11 +147,33 @@ public class PollingConfigurationAnalyzerTest {
restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE));
restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE));
+ restartEventAttrs.add(createEventAttribute("COMMAND", "Restart"));
+ restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED"));
ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs);
pca.addRestartEvent(clusterName, restartEvent);
+ // Simulate a service Start event
+ List<ApiEventAttribute> startEventAttrs = new ArrayList<>();
+ startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE));
+ startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE));
+ startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
+ startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED"));
+ ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
+ pca.addRestartEvent(clusterName, startEvent);
+
+ // Simulate a failed service Start event
+ startEventAttrs = new ArrayList<>();
+ startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+ startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
+ startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
+ startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED"));
+ ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
+ pca.addRestartEvent(clusterName, failedStartEvent);
+
try {
- pollingThreadExecutor.awaitTermination(15, TimeUnit.SECONDS);
+ pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//
}
@@ -158,6 +182,7 @@ public class PollingConfigurationAnalyzerTest {
pca.stop();
assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+ assertEquals(2, listener.howManyNotifications(address, clusterName));
}
@@ -351,7 +376,7 @@ public class PollingConfigurationAnalyzerTest {
}
@Override
- protected List<ApiEvent> queryRestartEvents(ApiClient client, String clusterName, String since) {
+ protected List<ApiEvent> queryEvents(ApiClient client, String clusterName, String since) {
return restartEvents.computeIfAbsent(clusterName, l -> new ArrayList<>());
}
@@ -370,15 +395,21 @@ public class PollingConfigurationAnalyzerTest {
private static class ChangeListener implements ConfigurationChangeListener {
private final Map<String, String> notifications = new HashMap<>();
+ private final List<String> events = new ArrayList<>();
@Override
public void onConfigurationChange(String source, String clusterName) {
notifications.put(source, clusterName);
+ events.add(source + "+" + clusterName);
}
boolean wasNotified(final String source, final String clusterName) {
return clusterName.equals(notifications.get(source));
}
+
+ int howManyNotifications(final String source, final String clusterName) {
+ return events.size();
+ }
}
}