You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by sm...@apache.org on 2024/01/04 07:31:03 UTC
(knox) branch master updated: KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)
This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new bb6719f3c KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)
bb6719f3c is described below
commit bb6719f3cad33cc89c990a2ab5bc61756c497d4f
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Thu Jan 4 08:30:58 2024 +0100
KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831)
---
.../ClouderaManagerServiceDiscoveryMessages.java | 4 +
.../cm/monitor/PollingConfigurationAnalyzer.java | 105 +++++++++++++--------
.../monitor/PollingConfigurationAnalyzerTest.java | 52 +++++++++-
3 files changed, 120 insertions(+), 41 deletions(-)
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index ca331241d..c86fe2912 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -133,6 +133,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
text = "Started ClouderaManager cluster configuration monitor (checking every {0} seconds)")
void startedClouderaManagerConfigMonitor(long pollingInterval);
+ @Message(level = MessageLevel.INFO,
+ text = "The Knox Gateway is not yet ready to monitor ClouderaManager cluster configuration changes.")
+ void gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();
+
@Message(level = MessageLevel.INFO, text = "Stopping ClouderaManager cluster configuration monitor")
void stoppingClouderaManagerConfigMonitor();
diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index c4e511d36..251ce6d24 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -42,6 +42,7 @@ import org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.security.KeystoreService;
import org.apache.knox.gateway.services.security.KeystoreServiceException;
import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages;
@@ -155,6 +156,8 @@ public class PollingConfigurationAnalyzer implements Runnable {
private final GatewayConfig gatewayConfig;
+ private GatewayStatusService gatewayStatusService;
+
PollingConfigurationAnalyzer(final GatewayConfig gatewayConfig,
final ClusterConfigurationCache configCache,
final AliasService aliasService,
@@ -207,56 +210,68 @@ public class PollingConfigurationAnalyzer implements Runnable {
log.startedClouderaManagerConfigMonitor(interval);
isActive = true;
+ boolean gatewayStatusOk = false;
while (isActive) {
- try {
- final List<String> clustersToStopMonitoring = new ArrayList<>();
-
- for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
- String address = entry.getKey();
- for (String clusterName : entry.getValue()) {
- if (configCache.getDiscoveryConfig(address, clusterName) == null) {
- log.noClusterConfiguration(clusterName, address);
- continue;
- }
- log.checkingClusterConfiguration(clusterName, address);
+ if (!gatewayStatusOk) {
+ gatewayStatusOk = getGatewayStatusService() != null && getGatewayStatusService().status();
+ }
+ if (gatewayStatusOk) {
+ monitorClusterConfigurationChanges();
+ } else {
+ log.gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();
+ }
+ waitFor(interval);
+ }
- // Check here for existing descriptor references, and add to the removal list if there are not any
- if (!clusterReferencesExist(address, clusterName)) {
- clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
- continue;
- }
+ log.stoppedClouderaManagerConfigMonitor();
+ }
- // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
- // start events, and check the configuration only of the restarted service(s) to identify changes
- // that should trigger re-discovery.
- final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);
+ private void monitorClusterConfigurationChanges() {
+ try {
+ final List<String> clustersToStopMonitoring = new ArrayList<>();
+
+ for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
+ String address = entry.getKey();
+ for (String clusterName : entry.getValue()) {
+ if (configCache.getDiscoveryConfig(address, clusterName) == null) {
+ log.noClusterConfiguration(clusterName, address);
+ continue;
+ }
+ log.checkingClusterConfiguration(clusterName, address);
- // If there are no recent start events, then nothing to do now
- if (!relevantEvents.isEmpty()) {
- // If a change has occurred, notify the listeners
- if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
- notifyChangeListener(address, clusterName);
- }
- // these events should not be processed again even if the next CM query result contains them
- relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
- }
+ // Check here for existing descriptor references, and add to the removal list if there are not any
+ if (!clusterReferencesExist(address, clusterName)) {
+ clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+ continue;
}
- }
- // Remove outdated entries from the cache
- for (String fqcn : clustersToStopMonitoring) {
- String[] parts = fqcn.split(FQCN_DELIM);
- stopMonitoring(parts[0], parts[1]);
+ // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+ // start events, and check the configuration only of the restarted service(s) to identify changes
+ // that should trigger re-discovery.
+ final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);
+
+ // If there are no recent start events, then nothing to do now
+ if (!relevantEvents.isEmpty()) {
+ // If a change has occurred, notify the listeners
+ if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
+ notifyChangeListener(address, clusterName);
+ }
+ // these events should not be processed again even if the next CM query result contains them
+ relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
+ }
}
- clustersToStopMonitoring.clear(); // reset the removal list
+ }
- } catch (Exception e) {
- log.clouderaManagerConfigurationChangesMonitoringError(e);
+ // Remove outdated entries from the cache
+ for (String fqcn : clustersToStopMonitoring) {
+ String[] parts = fqcn.split(FQCN_DELIM);
+ stopMonitoring(parts[0], parts[1]);
}
- waitFor(interval);
- }
+ clustersToStopMonitoring.clear(); // reset the removal list
- log.stoppedClouderaManagerConfigMonitor();
+ } catch (Exception e) {
+ log.clouderaManagerConfigurationChangesMonitoringError(e);
+ }
}
private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
@@ -372,6 +387,16 @@ public class PollingConfigurationAnalyzer implements Runnable {
return ccms;
}
+ private GatewayStatusService getGatewayStatusService() {
+ if (gatewayStatusService == null) {
+ final GatewayServices gatewayServices = GatewayServer.getGatewayServices();
+ if (gatewayServices != null) {
+ gatewayStatusService = gatewayServices.getService(ServiceType.GATEWAY_STATUS_SERVICE);
+ }
+ }
+ return gatewayStatusService;
+ }
+
/**
* Determine if any descriptors reference the specified discovery source and cluster.
*
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index d6dc186f4..50df250f0 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,11 +27,13 @@ import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
import org.easymock.EasyMock;
+import org.junit.After;
import org.junit.Test;
import java.io.File;
@@ -58,6 +60,11 @@ import static org.junit.Assert.assertTrue;
public class PollingConfigurationAnalyzerTest {
+ @After
+ public void tearDown() {
+ setGatewayServices(null);
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testRestartEventWithWrongApiEventCategory() {
doTestStartEvent(ApiEventCategory.LOG_EVENT);
@@ -341,11 +348,16 @@ public class PollingConfigurationAnalyzerTest {
return null;
}).once();
+ //GatewayStatusService mock
+ final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
+ EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();
+
// GatewayServices mock
GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
- EasyMock.replay(ts, ccms, gws);
+ EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
+ EasyMock.replay(ts, ccms, gatewayStatusService, gws);
try {
setGatewayServices(gws);
@@ -413,6 +425,26 @@ public class PollingConfigurationAnalyzerTest {
doTestEventWithConfigChange(revisionEvent, clusterName);
}
+ @Test
+ public void shouldNotPerformClusterConfigurationChangeMonitoringIfKnoxGatewayIsNotYetReady() {
+ final String address = "http://host1:1234";
+ final String clusterName = "Cluster 10";
+
+ // Simulate a successful restart waiting for staleness event with id = 123
+ final ApiEvent rollingRestartEvent = createApiEvent(clusterName, HiveOnTezServiceModelGenerator.SERVICE_TYPE, HiveOnTezServiceModelGenerator.SERVICE,
+ PollingConfigurationAnalyzer.RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND, PollingConfigurationAnalyzer.SUCCEEDED_STATUS, "EV_CLUSTER_RESTARTED",
+ "123");
+
+ final ChangeListener listener = new ChangeListener();
+ final TestablePollingConfigAnalyzer pca = buildPollingConfigAnalyzer(address, clusterName, Collections.emptyMap(), listener, false);
+
+ // this should NOT trigger a notification because the Knox Gateway is not yet
+ // ready (by GatewayStatusService.status())
+ listener.clearNotification();
+ doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap(), pca);
+ assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName));
+ }
+
private void doTestStartEvent(final ApiEventCategory category) {
final String clusterName = "My Cluster";
final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
@@ -472,6 +504,11 @@ public class PollingConfigurationAnalyzerTest {
private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener) {
+ return buildPollingConfigAnalyzer(address, clusterName, serviceConfigurationModels, listener, true);
+ }
+
+ private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
+ final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener, boolean isKnoxGatewayReady) {
final GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
EasyMock.expect(gatewayConfig.getIncludedSSLCiphers()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(gatewayConfig.getIncludedSSLProtocols()).andReturn(Collections.emptySet()).anyTimes();
@@ -495,6 +532,19 @@ public class PollingConfigurationAnalyzerTest {
EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName)).andReturn(serviceConfigurationModels).anyTimes();
EasyMock.replay(configCache);
+ if (isKnoxGatewayReady) {
+ // GatewayStatusService mock
+ final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
+ EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();
+
+ // GatewayServices mock
+ GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
+ EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
+ EasyMock.replay(gatewayStatusService, gws);
+
+ setGatewayServices(gws);
+ }
+
return new TestablePollingConfigAnalyzer(gatewayConfig, configCache, listener);
}