You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by sm...@apache.org on 2024/01/04 07:31:03 UTC

(knox) branch master updated: KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)

This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new bb6719f3c KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)
bb6719f3c is described below

commit bb6719f3cad33cc89c990a2ab5bc61756c497d4f
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Thu Jan 4 08:30:58 2024 +0100

    KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)
---
 .../ClouderaManagerServiceDiscoveryMessages.java   |   4 +
 .../cm/monitor/PollingConfigurationAnalyzer.java   | 105 +++++++++++++--------
 .../monitor/PollingConfigurationAnalyzerTest.java  |  52 +++++++++-
 3 files changed, 120 insertions(+), 41 deletions(-)

diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index ca331241d..c86fe2912 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -133,6 +133,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
            text = "Started ClouderaManager cluster configuration monitor (checking every {0} seconds)")
   void startedClouderaManagerConfigMonitor(long pollingInterval);
 
+  @Message(level = MessageLevel.INFO,
+      text = "The Knox Gateway is not yet ready to monitor ClouderaManager cluster configuration changes.")
+  void gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();
+
   @Message(level = MessageLevel.INFO, text = "Stopping ClouderaManager cluster configuration monitor")
   void stoppingClouderaManagerConfigMonitor();
 
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index c4e511d36..251ce6d24 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -42,6 +42,7 @@ import org.apache.knox.gateway.services.security.AliasService;
 import org.apache.knox.gateway.services.security.KeystoreService;
 import org.apache.knox.gateway.services.security.KeystoreServiceException;
 import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
 import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
 import org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages;
@@ -155,6 +156,8 @@ public class PollingConfigurationAnalyzer implements Runnable {
 
   private final GatewayConfig gatewayConfig;
 
+  private GatewayStatusService gatewayStatusService;
+
   PollingConfigurationAnalyzer(final GatewayConfig gatewayConfig,
                                final ClusterConfigurationCache   configCache,
                                final AliasService                aliasService,
@@ -207,56 +210,68 @@ public class PollingConfigurationAnalyzer implements Runnable {
     log.startedClouderaManagerConfigMonitor(interval);
     isActive = true;
 
+    boolean gatewayStatusOk = false;
     while (isActive) {
-      try {
-        final List<String> clustersToStopMonitoring = new ArrayList<>();
-
-        for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
-          String address = entry.getKey();
-          for (String clusterName : entry.getValue()) {
-            if (configCache.getDiscoveryConfig(address, clusterName) == null) {
-              log.noClusterConfiguration(clusterName, address);
-              continue;
-            }
-            log.checkingClusterConfiguration(clusterName, address);
+      if (!gatewayStatusOk) {
+        gatewayStatusOk = getGatewayStatusService() != null && getGatewayStatusService().status();
+      }
+      if (gatewayStatusOk) {
+        monitorClusterConfigurationChanges();
+      } else {
+        log.gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();
+      }
+      waitFor(interval);
+    }
 
-            // Check here for existing descriptor references, and add to the removal list if there are not any
-            if (!clusterReferencesExist(address, clusterName)) {
-              clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
-              continue;
-            }
+    log.stoppedClouderaManagerConfigMonitor();
+  }
 
-            // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
-            // start events, and check the configuration only of the restarted service(s) to identify changes
-            // that should trigger re-discovery.
-            final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);
+  private void monitorClusterConfigurationChanges() {
+    try {
+      final List<String> clustersToStopMonitoring = new ArrayList<>();
+
+      for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
+        String address = entry.getKey();
+        for (String clusterName : entry.getValue()) {
+          if (configCache.getDiscoveryConfig(address, clusterName) == null) {
+            log.noClusterConfiguration(clusterName, address);
+            continue;
+          }
+          log.checkingClusterConfiguration(clusterName, address);
 
-            // If there are no recent start events, then nothing to do now
-            if (!relevantEvents.isEmpty()) {
-              // If a change has occurred, notify the listeners
-              if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
-                notifyChangeListener(address, clusterName);
-              }
-              // these events should not be processed again even if the next CM query result contains them
-              relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
-            }
+          // Check here for existing descriptor references, and add to the removal list if there are not any
+          if (!clusterReferencesExist(address, clusterName)) {
+            clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+            continue;
           }
-        }
 
-        // Remove outdated entries from the cache
-        for (String fqcn : clustersToStopMonitoring) {
-          String[] parts = fqcn.split(FQCN_DELIM);
-          stopMonitoring(parts[0], parts[1]);
+          // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+          // start events, and check the configuration only of the restarted service(s) to identify changes
+          // that should trigger re-discovery.
+          final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);
+
+          // If there are no recent start events, then nothing to do now
+          if (!relevantEvents.isEmpty()) {
+            // If a change has occurred, notify the listeners
+            if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
+              notifyChangeListener(address, clusterName);
+            }
+            // these events should not be processed again even if the next CM query result contains them
+            relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
+          }
         }
-        clustersToStopMonitoring.clear(); // reset the removal list
+      }
 
-      } catch (Exception e) {
-        log.clouderaManagerConfigurationChangesMonitoringError(e);
+      // Remove outdated entries from the cache
+      for (String fqcn : clustersToStopMonitoring) {
+        String[] parts = fqcn.split(FQCN_DELIM);
+        stopMonitoring(parts[0], parts[1]);
       }
-      waitFor(interval);
-    }
+      clustersToStopMonitoring.clear(); // reset the removal list
 
-    log.stoppedClouderaManagerConfigMonitor();
+    } catch (Exception e) {
+      log.clouderaManagerConfigurationChangesMonitoringError(e);
+    }
   }
 
   private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
@@ -372,6 +387,16 @@ public class PollingConfigurationAnalyzer implements Runnable {
     return ccms;
   }
 
+  private GatewayStatusService getGatewayStatusService() {
+    if (gatewayStatusService == null) {
+      final GatewayServices gatewayServices = GatewayServer.getGatewayServices();
+      if (gatewayServices != null) {
+        gatewayStatusService = gatewayServices.getService(ServiceType.GATEWAY_STATUS_SERVICE);
+      }
+    }
+    return gatewayStatusService;
+  }
+
   /**
    * Determine if any descriptors reference the specified discovery source and cluster.
    *
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index d6dc186f4..50df250f0 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,11 +27,13 @@ import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.services.GatewayServices;
 import org.apache.knox.gateway.services.ServiceType;
 import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
 import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
 import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
 import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
 import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.Test;
 
 import java.io.File;
@@ -58,6 +60,11 @@ import static org.junit.Assert.assertTrue;
 
 public class PollingConfigurationAnalyzerTest {
 
+  @After
+  public void tearDown() {
+    setGatewayServices(null);
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testRestartEventWithWrongApiEventCategory() {
     doTestStartEvent(ApiEventCategory.LOG_EVENT);
@@ -341,11 +348,16 @@ public class PollingConfigurationAnalyzerTest {
                                               return null;
                                             }).once();
 
+    //GatewayStatusService mock
+    final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
+    EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();
+
     // GatewayServices mock
     GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
     EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
     EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
-    EasyMock.replay(ts, ccms, gws);
+    EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
+    EasyMock.replay(ts, ccms, gatewayStatusService, gws);
 
     try {
       setGatewayServices(gws);
@@ -413,6 +425,26 @@ public class PollingConfigurationAnalyzerTest {
     doTestEventWithConfigChange(revisionEvent, clusterName);
   }
 
+  @Test
+  public void shouldNotPerformClusterConfigurationChangeMonitoringIfKnoxGatewayIsNotYetReady() {
+    final String address = "http://host1:1234";
+    final String clusterName = "Cluster 10";
+
+    // Simulate a successful restart waiting for staleness event with id = 123
+    final ApiEvent rollingRestartEvent = createApiEvent(clusterName, HiveOnTezServiceModelGenerator.SERVICE_TYPE, HiveOnTezServiceModelGenerator.SERVICE,
+        PollingConfigurationAnalyzer.RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND, PollingConfigurationAnalyzer.SUCCEEDED_STATUS, "EV_CLUSTER_RESTARTED",
+        "123");
+
+    final ChangeListener listener = new ChangeListener();
+    final TestablePollingConfigAnalyzer pca = buildPollingConfigAnalyzer(address, clusterName, Collections.emptyMap(), listener, false);
+
+    // this should NOT trigger a notification because the Knox Gateway is not yet
+    // ready (by GatewayStatusService.status())
+    listener.clearNotification();
+    doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap(), pca);
+    assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName));
+  }
+
   private void doTestStartEvent(final ApiEventCategory category) {
     final String clusterName = "My Cluster";
     final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
@@ -472,6 +504,11 @@ public class PollingConfigurationAnalyzerTest {
 
   private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
       final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener) {
+    return buildPollingConfigAnalyzer(address, clusterName, serviceConfigurationModels, listener, true);
+  }
+
+  private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
+      final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener, boolean isKnoxGatewayReady) {
     final GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     EasyMock.expect(gatewayConfig.getIncludedSSLCiphers()).andReturn(Collections.emptyList()).anyTimes();
     EasyMock.expect(gatewayConfig.getIncludedSSLProtocols()).andReturn(Collections.emptySet()).anyTimes();
@@ -495,6 +532,19 @@ public class PollingConfigurationAnalyzerTest {
     EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName)).andReturn(serviceConfigurationModels).anyTimes();
     EasyMock.replay(configCache);
 
+    if (isKnoxGatewayReady) {
+      // GatewayStatusService mock
+      final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
+      EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();
+
+      // GatewayServices mock
+      GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
+      EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
+      EasyMock.replay(gatewayStatusService, gws);
+
+      setGatewayServices(gws);
+    }
+
     return new TestablePollingConfigAnalyzer(gatewayConfig, configCache, listener);
   }