You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2020/04/08 18:38:55 UTC

[knox] branch master updated: KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)

This is an automated email from the ASF dual-hosted git repository.

lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 8920651  KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)
8920651 is described below

commit 89206511aad7cacb91c7fa032490816e7cdd18ba
Author: lmccay <lm...@apache.org>
AuthorDate: Wed Apr 8 14:38:46 2020 -0400

    KNOX-2304 - CM discovery cluster config monitor needs to be aware of … (#307)
    
    * KNOX-2304 - CM discovery cluster config monitor needs to be aware of all relevant CM event types
    
    Change-Id: Ic26ad36fcb110e01d30d636f14d5b383de01ff17
    
    * KNOX-2304 - address review comments
    
    Change-Id: I5c2009f505b31c5c69ee7672ed37d21e1a7c48bb
---
 .../cm/monitor/PollingConfigurationAnalyzer.java   | 100 ++++++++++++++-------
 .../monitor/PollingConfigurationAnalyzerTest.java  |  37 +++++++-
 2 files changed, 101 insertions(+), 36 deletions(-)

diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index b9f163b..380962a 100644
--- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -60,17 +60,27 @@ import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMon
 @SuppressWarnings("PMD.DoNotUseThreads")
 public class PollingConfigurationAnalyzer implements Runnable {
 
-  // The format of the filter employed when restart events are queried from ClouderaManager
-  private static final String RESTART_EVENTS_QUERY_FORMAT =
+  private static final String COMMAND = "COMMAND";
+
+  private static final String COMMAND_STATUS = "COMMAND_STATUS";
+
+  private static final String STARTED_STATUS = "STARTED";
+
+  private static final String SUCCEEDED_STATUS = "SUCCEEDED";
+
+  private static final String RESTART_COMMAND = "Restart";
+
+  private static final String START_COMMAND = "Start";
+
+  // The format of the filter employed when start events are queried from ClouderaManager
+  private static final String EVENTS_QUERY_FORMAT =
                                 "category==" + ApiEventCategory.AUDIT_EVENT.getValue() +
-                                ";attributes.command==Restart" +
-                                ";attributes.command_status==SUCCEEDED" +
                                 ";attributes.cluster==\"%s\"%s";
 
-  // The format of the timestamp element of the restart events query filter
+  // The format of the timestamp element of the start events query filter
   private static final String EVENTS_QUERY_TIMESTAMP_FORMAT = ";timeOccurred=gt=%s";
 
-  // The default amount of time before "now" to check for restart events the first time
+  // The default amount of time before "now" to check for start events the first time
   private static final long DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET = (60 * 60 * 1000); // one hour
 
   private static final int DEFAULT_POLLING_INTERVAL = 60;
@@ -100,10 +110,10 @@ public class PollingConfigurationAnalyzer implements Runnable {
   // Cache of ClouderaManager API clients, keyed by discovery address
   private final Map<String, DiscoveryApiClient> clients = new ConcurrentHashMap<>();
 
-  // Timestamp records of the most recent restart event query per discovery address
+  // Timestamp records of the most recent start event query per discovery address
   private Map<String, String> eventQueryTimestamps = new ConcurrentHashMap<>();
 
-  // The amount of time before "now" to will check for restart events the first time
+  // The amount of time before "now" to will check for start events the first time
   private long eventQueryDefaultTimestampOffset = DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET;
 
   private boolean isActive;
@@ -163,34 +173,34 @@ public class PollingConfigurationAnalyzer implements Runnable {
             continue;
           }
 
-          // Configuration changes don't mean anything without corresponding service restarts. Therefore, monitor
-          // restart events, and check the configuration only of the restarted service(s) to identify changes
+          // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
+          // start events, and check the configuration only of the restarted service(s) to identify changes
           // that should trigger re-discovery.
-          List<RestartEvent> restartEvents = getRestartEvents(address, clusterName);
+          List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
 
-          // If there are no recent restart events, then nothing to do now
-          if (!restartEvents.isEmpty()) {
+          // If there are no recent start events, then nothing to do now
+          if (!relevantEvents.isEmpty()) {
             boolean configHasChanged = false;
 
-            // If there are restart events, then check the previously-recorded properties for the same service to
+            // If there are start events, then check the previously-recorded properties for the same service to
             // identify if the configuration has changed
             Map<String, ServiceConfigurationModel> serviceConfigurations =
                                     configCache.getClusterServiceConfigurations(address, clusterName);
 
-            // Those services for which a restart even has been handled
+            // Those services for which a start even has been handled
             List<String> handledServiceTypes = new ArrayList<>();
 
-            for (RestartEvent re : restartEvents) {
+            for (StartEvent re : relevantEvents) {
               String serviceType = re.getServiceType();
 
-              // Determine if we've already handled a restart event for this service type
+              // Determine if we've already handled a start event for this service type
               if (!handledServiceTypes.contains(serviceType)) {
 
                 // Get the previously-recorded configuration
                 ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
 
                 if (serviceConfig != null) {
-                  // Get the current config for the restarted service, and compare with the previously-recorded config
+                  // Get the current config for the started service, and compare with the previously-recorded config
                   ServiceConfigurationModel currentConfig =
                                   getCurrentServiceConfiguration(address, clusterName, re.getService());
 
@@ -337,15 +347,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
   }
 
   /**
-   * Get restart events for the specified ClouderaManager cluster.
+   * Get relevant events for the specified ClouderaManager cluster.
    *
    * @param address     The address of the ClouderaManager instance.
    * @param clusterName The name of the cluster.
    *
-   * @return A List of RestartEvent objects for service restart events since the last time they were queried.
+   * @return A List of StartEvent objects for service start events since the last time they were queried.
    */
-  private List<RestartEvent> getRestartEvents(final String address, final String clusterName) {
-    List<RestartEvent> restartEvents = new ArrayList<>();
+  private List<StartEvent> getRelevantEvents(final String address, final String clusterName) {
+    List<StartEvent> relevantEvents = new ArrayList<>();
 
     // Get the last event query timestamp
     String lastTimestamp = getEventQueryTimestamp(address, clusterName);
@@ -360,19 +370,43 @@ public class PollingConfigurationAnalyzer implements Runnable {
     // Record the new event query timestamp for this address/cluster
     setEventQueryTimestamp(address, clusterName, Instant.now());
 
-    // Query the event log from CM for service/cluster restart events
-    List<ApiEvent> events = queryRestartEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
+    // Query the event log from CM for service/cluster start events
+    List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
                                                clusterName,
                                                lastTimestamp);
     for (ApiEvent event : events) {
-      restartEvents.add(new RestartEvent(event));
+      if(isRelevantEvent(event)) {
+        relevantEvents.add(new StartEvent(event));
+      }
+    }
+
+    return relevantEvents;
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isRelevantEvent(ApiEvent event) {
+    boolean rc = false;
+    String command = null;
+    String status = null;
+    List<ApiEventAttribute> attributes = event.getAttributes();
+    Map<String,Object> map = getAttributeMap(attributes);
+    command = (String) ((List<String>) map.get(COMMAND)).get(0);
+    status = (String) ((List<String>) map.get(COMMAND_STATUS)).get(0);
+    if (START_COMMAND.equals(command) || RESTART_COMMAND.equals(command) &&
+        SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status)) {
+      rc = true;
     }
+    return rc;
+  }
 
-    return restartEvents;
+  private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) {
+    Map<String,Object> map = new HashMap<>();
+    attributes.forEach(attr -> { map.put(attr.getName(), attr.getValues());});
+    return map;
   }
 
   /**
-   * Query the ClouderaManager instance associated with the specified client for any service restart events in the
+   * Query the ClouderaManager instance associated with the specified client for any service start events in the
    * specified cluster since the specified time.
    *
    * @param client      A ClouderaManager API client.
@@ -381,15 +415,15 @@ public class PollingConfigurationAnalyzer implements Runnable {
    *
    * @return A List of ApiEvent objects representing the relevant events since the specified time.
    */
-  protected List<ApiEvent> queryRestartEvents(final ApiClient client, final String clusterName, final String since) {
+  protected List<ApiEvent> queryEvents(final ApiClient client, final String clusterName, final String since) {
     List<ApiEvent> events = new ArrayList<>();
 
-    // Setup the query for restart events
+    // Setup the query for events
     String timeFilter =
         (since != null) ? String.format(Locale.ROOT, EVENTS_QUERY_TIMESTAMP_FORMAT, since) : "";
 
     String queryString = String.format(Locale.ROOT,
-                                       RESTART_EVENTS_QUERY_FORMAT,
+                                       EVENTS_QUERY_FORMAT,
                                        clusterName,
                                        timeFilter);
 
@@ -500,9 +534,9 @@ public class PollingConfigurationAnalyzer implements Runnable {
   }
 
   /**
-   * Internal representation of a ClouderaManager service restart event
+   * Internal representation of a ClouderaManager service start event
    */
-  static final class RestartEvent {
+  static final class StartEvent {
 
     private static final String ATTR_CLUSTER = "CLUSTER";
     private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
@@ -521,7 +555,7 @@ public class PollingConfigurationAnalyzer implements Runnable {
     private String serviceType;
     private String service;
 
-    RestartEvent(final ApiEvent auditEvent) {
+    StartEvent(final ApiEvent auditEvent) {
       if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
         throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue());
       }
diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index a817c83..cb2066e 100644
--- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -27,7 +27,9 @@ import org.apache.knox.gateway.services.ServiceType;
 import org.apache.knox.gateway.services.topology.TopologyService;
 import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
+import org.apache.knox.gateway.topology.discovery.cm.model.cm.ClouderaManagerAPIServiceModelGenerator;
 import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
+import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
@@ -75,7 +77,7 @@ public class PollingConfigurationAnalyzerTest {
     apiEventAttrs.add(createEventAttribute("SERVICE", service));
     ApiEvent apiEvent = createApiEvent(category, apiEventAttrs);
 
-    PollingConfigurationAnalyzer.RestartEvent restartEvent = new PollingConfigurationAnalyzer.RestartEvent(apiEvent);
+    PollingConfigurationAnalyzer.StartEvent restartEvent = new PollingConfigurationAnalyzer.StartEvent(apiEvent);
     assertNotNull(restartEvent);
     assertEquals(clusterName, restartEvent.getClusterName());
     assertEquals(serviceType, restartEvent.getServiceType());
@@ -145,11 +147,33 @@ public class PollingConfigurationAnalyzerTest {
     restartEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
     restartEventAttrs.add(createEventAttribute("SERVICE_TYPE", NameNodeServiceModelGenerator.SERVICE_TYPE));
     restartEventAttrs.add(createEventAttribute("SERVICE", NameNodeServiceModelGenerator.SERVICE));
+    restartEventAttrs.add(createEventAttribute("COMMAND", "Restart"));
+    restartEventAttrs.add(createEventAttribute("COMMAND_STATUS", "SUCCEEDED"));
     ApiEvent restartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, restartEventAttrs);
     pca.addRestartEvent(clusterName, restartEvent);
 
+    // Simulate a service Start event
+    List<ApiEventAttribute> startEventAttrs = new ArrayList<>();
+    startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    startEventAttrs.add(createEventAttribute("SERVICE_TYPE", ClouderaManagerAPIServiceModelGenerator.SERVICE_TYPE));
+    startEventAttrs.add(createEventAttribute("SERVICE", ClouderaManagerAPIServiceModelGenerator.SERVICE));
+    startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
+    startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "STARTED"));
+    ApiEvent startEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
+    pca.addRestartEvent(clusterName, startEvent);
+
+    // Simulate a failed service Start event
+    startEventAttrs = new ArrayList<>();
+    startEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    startEventAttrs.add(createEventAttribute("SERVICE_TYPE", HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+    startEventAttrs.add(createEventAttribute("SERVICE", HiveOnTezServiceModelGenerator.SERVICE));
+    startEventAttrs.add(createEventAttribute("COMMAND", "Start"));
+    startEventAttrs.add(createEventAttribute("COMMAND_STATUS", "FAILED"));
+    ApiEvent failedStartEvent = createApiEvent(ApiEventCategory.AUDIT_EVENT, startEventAttrs);
+    pca.addRestartEvent(clusterName, failedStartEvent);
+
     try {
-      pollingThreadExecutor.awaitTermination(15, TimeUnit.SECONDS);
+      pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       //
     }
@@ -158,6 +182,7 @@ public class PollingConfigurationAnalyzerTest {
     pca.stop();
 
     assertTrue("Expected a change notification", listener.wasNotified(address, clusterName));
+    assertEquals(2, listener.howManyNotifications(address, clusterName));
   }
 
 
@@ -351,7 +376,7 @@ public class PollingConfigurationAnalyzerTest {
     }
 
     @Override
-    protected List<ApiEvent> queryRestartEvents(ApiClient client, String clusterName, String since) {
+    protected List<ApiEvent> queryEvents(ApiClient client, String clusterName, String since) {
       return restartEvents.computeIfAbsent(clusterName, l -> new ArrayList<>());
     }
 
@@ -370,15 +395,21 @@ public class PollingConfigurationAnalyzerTest {
 
   private static class ChangeListener implements ConfigurationChangeListener {
     private final Map<String, String> notifications = new HashMap<>();
+    private final List<String> events = new ArrayList<>();
 
     @Override
     public void onConfigurationChange(String source, String clusterName) {
       notifications.put(source, clusterName);
+      events.add(source + "+" + clusterName);
     }
 
     boolean wasNotified(final String source, final String clusterName) {
       return clusterName.equals(notifications.get(source));
     }
+
+    int howManyNotifications(final String source, final String clusterName) {
+      return events.size();
+    }
   }
 
 }