You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by sm...@apache.org on 2020/04/27 14:58:16 UTC

[knox] branch master updated: KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324)

This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new f96b62d  KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324)
f96b62d is described below

commit f96b62d133d300519fc45f295f5e7a2b58922949
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Mon Apr 27 16:58:09 2020 +0200

    KNOX-2351 - Catching any errors while monitoring CM configuration changes (#324)
---
 .../ClouderaManagerServiceDiscoveryMessages.java   |   3 +
 .../cm/monitor/PollingConfigurationAnalyzer.java   | 153 +++++++++++----------
 2 files changed, 82 insertions(+), 74 deletions(-)

diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 289c752..7f7c644 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -126,6 +126,9 @@ public interface ClouderaManagerServiceDiscoveryMessages {
   @Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for configuration changes...")
   void checkingClusterConfiguration(String clusterName, String discoveryAddress);
 
+  @Message(level = MessageLevel.ERROR, text = "Error while monitoring ClouderaManager configuration changes: {0}")
+  void clouderaManagerConfigurationChangesMonitoringError(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
   @Message(level = MessageLevel.ERROR,
       text = "Error getting service configuration details from ClouderaManager: {0}")
   void clouderaManagerConfigurationAPIError(@StackTrace(level = MessageLevel.DEBUG) ApiException e);
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index fb8d73c..4b7935f 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -162,93 +162,98 @@ public class PollingConfigurationAnalyzer implements Runnable {
     isActive = true;
 
     while (isActive) {
-      List<String> clustersToStopMonitoring = new ArrayList<>();
+      try {
+        final List<String> clustersToStopMonitoring = new ArrayList<>();
+
+        for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
+          String address = entry.getKey();
+          for (String clusterName : entry.getValue()) {
+            log.checkingClusterConfiguration(clusterName, address);
+
+            // Check here for existing descriptor references, and add to the removal list if there are not any
+            if (!clusterReferencesExist(address, clusterName)) {
+              clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+              continue;
+            }
 
-      for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
-        String address = entry.getKey();
-        for (String clusterName : entry.getValue()) {
-          log.checkingClusterConfiguration(clusterName, address);
+            // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+            // start events, and check the configuration only of the restarted service(s) to identify changes
+            // that should trigger re-discovery.
+            final List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
 
-          // Check here for existing descriptor references, and add to the removal list if there are not any
-          if (!clusterReferencesExist(address, clusterName)) {
-            clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
-            continue;
+            // If there are no recent start events, then nothing to do now
+            if (!relevantEvents.isEmpty()) {
+              // If a change has occurred, notify the listeners
+              if (hasConfigChanged(address, clusterName, relevantEvents)) {
+                notifyChangeListener(address, clusterName);
+              }
+            }
           }
+        }
 
-          // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
-          // start events, and check the configuration only of the restarted service(s) to identify changes
-          // that should trigger re-discovery.
-          List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
-
-          // If there are no recent start events, then nothing to do now
-          if (!relevantEvents.isEmpty()) {
-            boolean configHasChanged = false;
-
-            // If there are start events, then check the previously-recorded properties for the same service to
-            // identify if the configuration has changed
-            Map<String, ServiceConfigurationModel> serviceConfigurations =
-                                    configCache.getClusterServiceConfigurations(address, clusterName);
-
-            // Those services for which a start even has been handled
-            List<String> handledServiceTypes = new ArrayList<>();
-
-            for (StartEvent re : relevantEvents) {
-              String serviceType = re.getServiceType();
-
-              // Determine if we've already handled a start event for this service type
-              if (!handledServiceTypes.contains(serviceType)) {
-
-                // Get the previously-recorded configuration
-                ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
-
-                if (serviceConfig != null) {
-                  // Get the current config for the started service, and compare with the previously-recorded config
-                  ServiceConfigurationModel currentConfig =
-                                  getCurrentServiceConfiguration(address, clusterName, re.getService());
-
-                  if (currentConfig != null) {
-                    log.analyzingCurrentServiceConfiguration(re.getService());
-                    try {
-                      configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig);
-                    } catch (Exception e) {
-                      log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
-                    }
-                  }
-                } else {
-                  // A new service (no prior config) represent a config change, since a descriptor may have referenced
-                  // the "new" service, but discovery had previously not succeeded because the service had not been
-                  // configured (appropriately) at that time.
-                  log.serviceEnabled(re.getService());
-                  configHasChanged = true;
-                }
-
-                handledServiceTypes.add(serviceType);
-              }
+        // Remove outdated entries from the cache
+        for (String fqcn : clustersToStopMonitoring) {
+          String[] parts = fqcn.split(FQCN_DELIM);
+          stopMonitoring(parts[0], parts[1]);
+        }
+        clustersToStopMonitoring.clear(); // reset the removal list
 
-              if (configHasChanged) {
-                break; // No need to continue checking once we've identified one reason to perform discovery again
-              }
-            }
+        waitFor(interval);
+      } catch (Exception e) {
+        log.clouderaManagerConfigurationChangesMonitoringError(e);
+      }
+    }
+
+    log.stoppedClouderaManagerConfigMonitor();
+  }
+
+  private boolean hasConfigChanged(String address, String clusterName, List<StartEvent> relevantEvents) {
+    // If there are start events, then check the previously-recorded properties for the same service to
+    // identify if the configuration has changed
+    final Map<String, ServiceConfigurationModel> serviceConfigurations = configCache.getClusterServiceConfigurations(address, clusterName);
+
+    // Those services for which a start even has been handled
+    final List<String> handledServiceTypes = new ArrayList<>();
+
+    boolean configHasChanged = false;
+    for (StartEvent re : relevantEvents) {
+      String serviceType = re.getServiceType();
+
+      // Determine if we've already handled a start event for this service type
+      if (!handledServiceTypes.contains(serviceType)) {
+
+        // Get the previously-recorded configuration
+        ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
 
-            // If a change has occurred, notify the listeners
-            if (configHasChanged) {
-              notifyChangeListener(address, clusterName);
+        if (serviceConfig != null) {
+          // Get the current config for the started service, and compare with the previously-recorded config
+          ServiceConfigurationModel currentConfig =
+                          getCurrentServiceConfiguration(address, clusterName, re.getService());
+
+          if (currentConfig != null) {
+            log.analyzingCurrentServiceConfiguration(re.getService());
+            try {
+              configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig);
+            } catch (Exception e) {
+              log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
             }
           }
+        } else {
+          // A new service (no prior config) represent a config change, since a descriptor may have referenced
+          // the "new" service, but discovery had previously not succeeded because the service had not been
+          // configured (appropriately) at that time.
+          log.serviceEnabled(re.getService());
+          configHasChanged = true;
         }
-      }
 
-      // Remove outdated entries from the cache
-      for (String fqcn : clustersToStopMonitoring) {
-        String[] parts = fqcn.split(FQCN_DELIM);
-        stopMonitoring(parts[0], parts[1]);
+        handledServiceTypes.add(serviceType);
       }
-      clustersToStopMonitoring.clear(); // reset the removal list
 
-      waitFor(interval);
+      if (configHasChanged) {
+        break; // No need to continue checking once we've identified one reason to perform discovery again
+      }
     }
-
-    log.stoppedClouderaManagerConfigMonitor();
+    return configHasChanged;
   }
 
   private TopologyService getTopologyService() {