You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by sm...@apache.org on 2020/04/27 14:58:16 UTC
[knox] branch master updated: KNOX-2351 - Catching any errors while
monitoring CM configuration changes (#324)
This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new f96b62d KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324)
f96b62d is described below
commit f96b62d133d300519fc45f295f5e7a2b58922949
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Mon Apr 27 16:58:09 2020 +0200
KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324)
---
.../ClouderaManagerServiceDiscoveryMessages.java | 3 +
.../cm/monitor/PollingConfigurationAnalyzer.java | 153 +++++++++++----------
2 files changed, 82 insertions(+), 74 deletions(-)
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 289c752..7f7c644 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -126,6 +126,9 @@ public interface ClouderaManagerServiceDiscoveryMessages {
@Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for configuration changes...")
void checkingClusterConfiguration(String clusterName, String discoveryAddress);
+ @Message(level = MessageLevel.ERROR, text = "Error while monitoring ClouderaManager configuration changes: {0}")
+ void clouderaManagerConfigurationChangesMonitoringError(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
@Message(level = MessageLevel.ERROR,
text = "Error getting service configuration details from ClouderaManager: {0}")
void clouderaManagerConfigurationAPIError(@StackTrace(level = MessageLevel.DEBUG) ApiException e);
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index fb8d73c..4b7935f 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -162,93 +162,98 @@ public class PollingConfigurationAnalyzer implements Runnable {
isActive = true;
while (isActive) {
- List<String> clustersToStopMonitoring = new ArrayList<>();
+ try {
+ final List<String> clustersToStopMonitoring = new ArrayList<>();
+
+ for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
+ String address = entry.getKey();
+ for (String clusterName : entry.getValue()) {
+ log.checkingClusterConfiguration(clusterName, address);
+
+ // Check here for existing descriptor references, and add to the removal list if there are not any
+ if (!clusterReferencesExist(address, clusterName)) {
+ clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+ continue;
+ }
- for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
- String address = entry.getKey();
- for (String clusterName : entry.getValue()) {
- log.checkingClusterConfiguration(clusterName, address);
+ // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+ // start events, and check the configuration only of the restarted service(s) to identify changes
+ // that should trigger re-discovery.
+ final List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
- // Check here for existing descriptor references, and add to the removal list if there are not any
- if (!clusterReferencesExist(address, clusterName)) {
- clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
- continue;
+ // If there are no recent start events, then nothing to do now
+ if (!relevantEvents.isEmpty()) {
+ // If a change has occurred, notify the listeners
+ if (hasConfigChanged(address, clusterName, relevantEvents)) {
+ notifyChangeListener(address, clusterName);
+ }
+ }
}
+ }
- // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
- // start events, and check the configuration only of the restarted service(s) to identify changes
- // that should trigger re-discovery.
- List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
-
- // If there are no recent start events, then nothing to do now
- if (!relevantEvents.isEmpty()) {
- boolean configHasChanged = false;
-
- // If there are start events, then check the previously-recorded properties for the same service to
- // identify if the configuration has changed
- Map<String, ServiceConfigurationModel> serviceConfigurations =
- configCache.getClusterServiceConfigurations(address, clusterName);
-
- // Those services for which a start even has been handled
- List<String> handledServiceTypes = new ArrayList<>();
-
- for (StartEvent re : relevantEvents) {
- String serviceType = re.getServiceType();
-
- // Determine if we've already handled a start event for this service type
- if (!handledServiceTypes.contains(serviceType)) {
-
- // Get the previously-recorded configuration
- ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
-
- if (serviceConfig != null) {
- // Get the current config for the started service, and compare with the previously-recorded config
- ServiceConfigurationModel currentConfig =
- getCurrentServiceConfiguration(address, clusterName, re.getService());
-
- if (currentConfig != null) {
- log.analyzingCurrentServiceConfiguration(re.getService());
- try {
- configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig);
- } catch (Exception e) {
- log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
- }
- }
- } else {
- // A new service (no prior config) represent a config change, since a descriptor may have referenced
- // the "new" service, but discovery had previously not succeeded because the service had not been
- // configured (appropriately) at that time.
- log.serviceEnabled(re.getService());
- configHasChanged = true;
- }
-
- handledServiceTypes.add(serviceType);
- }
+ // Remove outdated entries from the cache
+ for (String fqcn : clustersToStopMonitoring) {
+ String[] parts = fqcn.split(FQCN_DELIM);
+ stopMonitoring(parts[0], parts[1]);
+ }
+ clustersToStopMonitoring.clear(); // reset the removal list
- if (configHasChanged) {
- break; // No need to continue checking once we've identified one reason to perform discovery again
- }
- }
+ waitFor(interval);
+ } catch (Exception e) {
+ log.clouderaManagerConfigurationChangesMonitoringError(e);
+ }
+ }
+
+ log.stoppedClouderaManagerConfigMonitor();
+ }
+
+ private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) {
+ // If there are start events, then check the previously-recorded properties for the same service to
+ // identify if the configuration has changed
+ final Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName);
+
+ // Those services for which a start even has been handled
+ final List<String> handledServiceTypes = new ArrayList<>();
+
+ boolean configHasChanged = false;
+ for (StartEvent re : relevantEvents) {
+ String serviceType = re.getServiceType();
+
+ // Determine if we've already handled a start event for this service type
+ if (!handledServiceTypes.contains(serviceType)) {
+
+ // Get the previously-recorded configuration
+ ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
- // If a change has occurred, notify the listeners
- if (configHasChanged) {
- notifyChangeListener(address, clusterName);
+ if (serviceConfig != null) {
+ // Get the current config for the started service, and compare with the previously-recorded config
+ ServiceConfigurationModel currentConfig =
+ getCurrentServiceConfiguration(address, clusterName, re.getService());
+
+ if (currentConfig != null) {
+ log.analyzingCurrentServiceConfiguration(re.getService());
+ try {
+ configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig);
+ } catch (Exception e) {
+ log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
}
}
+ } else {
+ // A new service (no prior config) represent a config change, since a descriptor may have referenced
+ // the "new" service, but discovery had previously not succeeded because the service had not been
+ // configured (appropriately) at that time.
+ log.serviceEnabled(re.getService());
+ configHasChanged = true;
}
- }
- // Remove outdated entries from the cache
- for (String fqcn : clustersToStopMonitoring) {
- String[] parts = fqcn.split(FQCN_DELIM);
- stopMonitoring(parts[0], parts[1]);
+ handledServiceTypes.add(serviceType);
}
- clustersToStopMonitoring.clear(); // reset the removal list
- waitFor(interval);
+ if (configHasChanged) {
+ break; // No need to continue checking once we've identified one reason to perform discovery again
+ }
}
-
- log.stoppedClouderaManagerConfigMonitor();
+ return configHasChanged;
}
private TopologyService getTopologyService() {