You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/06/22 15:28:15 UTC

[4/4] ambari git commit: AMBARI-21200. Add serverside event metrics to track system health (mpapirkovskyy)

AMBARI-21200. Add serverside event metrics to track system health (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1e64135c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1e64135c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1e64135c

Branch: refs/heads/branch-3.0-perf
Commit: 1e64135c1c692870f8e831f5d5d2a3ff739627ba
Parents: 670a08e
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Jun 8 17:55:19 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Thu Jun 22 18:25:50 2017 +0300

----------------------------------------------------------------------
 ambari-server/conf/unix/metrics.properties      |   3 +-
 .../ambari/server/events/AlertUpdateEvent.java  |   8 +-
 .../ambari/server/events/AmbariUpdateEvent.java |  50 ++++++++-
 .../ambari/server/events/CommandEvent.java      |   7 +-
 .../server/events/ConfigsUpdateEvent.java       |   8 +-
 .../server/events/HostComponentUpdateEvent.java |   8 +-
 .../server/events/MetadataUpdateEvent.java      |   8 +-
 .../server/events/RequestUpdateEvent.java       |   8 +-
 .../server/events/TopologyUpdateEvent.java      |   8 +-
 .../metrics/system/impl/MetricsServiceImpl.java |   6 ++
 .../system/impl/StompEventsMetricsSource.java   | 107 +++++++++++++++++++
 11 files changed, 180 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/conf/unix/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties
index efbfb04..96748d2 100644
--- a/ambari-server/conf/unix/metrics.properties
+++ b/ambari-server/conf/unix/metrics.properties
@@ -20,10 +20,11 @@
 #################### Metrics Source Configs #####################
 
 #Metric sources : jvm,database
-metric.sources=jvm
+metric.sources=jvm,event
 
 #### JVM Source Configs ###
 source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.event.class=org.apache.ambari.server.metrics.system.impl.StompEventsMetricsSource
 source.jvm.interval=10
 
 #### Database Source Configs ###

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/AlertUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertUpdateEvent.java
index df5ab4d..73862d5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertUpdateEvent.java
@@ -23,10 +23,11 @@ import java.util.List;
 
 import org.apache.ambari.server.state.Alert;
 
-public class AlertUpdateEvent implements AmbariUpdateEvent {
+public class AlertUpdateEvent extends AmbariUpdateEvent {
   private List<Alert> alerts = new ArrayList<>();
 
   public AlertUpdateEvent(List<Alert> alerts) {
+    super(Type.ALERT);
     this.alerts = alerts;
   }
 
@@ -37,9 +38,4 @@ public class AlertUpdateEvent implements AmbariUpdateEvent {
   public void setAlerts(List<Alert> alerts) {
     this.alerts = alerts;
   }
-
-  @Override
-  public String getDestination() {
-    return "/events/alerts";
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
index 62f253b..2b7efa6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
@@ -19,8 +19,54 @@ package org.apache.ambari.server.events;
 
 import java.beans.Transient;
 
-public interface AmbariUpdateEvent {
+public abstract class AmbariUpdateEvent {
+  protected final Type type;
+
+  public AmbariUpdateEvent(Type type) {
+    this.type = type;
+  }
+
+  @Transient
+  public Type getType() {
+    return type;
+  }
+
+  @Transient
+  public String getDestination() {
+    return type.getDestination();
+  }
 
   @Transient
-  String getDestination();
+  public String getMetricName() {
+    return type.getMetricName();
+  }
+
+  public enum Type {
+    ALERT("/events/alerts", "events.alerts"),
+    METADATA("/events/metadata", "events.metadata"),
+    TOPOLOGY("/events/topology", "events.topology_update"),
+    AGENT_CONFIGS("/events/configs", "events.agent.configs"),
+    CONFIGS("/events/configs", "events.configs"),
+    HOSTCOMPONENT("/events/hostcomponents", "events.hostcomponents"),
+    NAMEDHOSTCOMPONENT("/events/tasks/", "events.hostrolecommands.named"),
+    REQUEST("/events/requests", "events.requests"),
+    NAMEDREQUEST("/events/requests", "events.requests.named"),
+    COMMAND("/user/commands", "events.commands");
+
+    private String destination;
+    private String metricName;
+
+    Type(String destination, String metricName) {
+      this.destination = destination;
+      this.metricName = metricName;
+    }
+
+    public String getDestination() {
+      return destination;
+    }
+
+    public String getMetricName() {
+      return metricName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/CommandEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/CommandEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/CommandEvent.java
index cee2fa3..80245aa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/CommandEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/CommandEvent.java
@@ -17,9 +17,8 @@
  */
 package org.apache.ambari.server.events;
 
-public class CommandEvent implements AmbariUpdateEvent {
-  @Override
-  public String getDestination() {
-    return "/command";
+public class CommandEvent extends AmbariUpdateEvent {
+  public CommandEvent() {
+    super(Type.COMMAND);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
index 738badb..b6eec10 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
 import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
 
-public class ConfigsUpdateEvent implements AmbariUpdateEvent {
+public class ConfigsUpdateEvent extends AmbariUpdateEvent {
 
   private Long serviceConfigId;
   private Long clusterId;
@@ -41,6 +41,7 @@ public class ConfigsUpdateEvent implements AmbariUpdateEvent {
   private List<ClusterConfig> configs = new ArrayList<>();
 
   public ConfigsUpdateEvent(ServiceConfigEntity configs, String configGroupName, List<String> hostNames) {
+    super(Type.CONFIGS);
     this.serviceConfigId = configs.getServiceConfigId();
     this.clusterId = configs.getClusterEntity().getClusterId();
     this.serviceName = configs.getServiceName();
@@ -147,11 +148,6 @@ public class ConfigsUpdateEvent implements AmbariUpdateEvent {
     this.groupName = groupName;
   }
 
-  @Override
-  public String getDestination() {
-    return "/events/configs";
-  }
-
   public class ClusterConfig {
     private Long clusterId;
     private String type;

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/HostComponentUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostComponentUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostComponentUpdateEvent.java
index 7500fca..08ed9e0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostComponentUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostComponentUpdateEvent.java
@@ -21,7 +21,7 @@ package org.apache.ambari.server.events;
 import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
 import org.apache.ambari.server.state.State;
 
-public class HostComponentUpdateEvent implements AmbariUpdateEvent {
+public class HostComponentUpdateEvent extends AmbariUpdateEvent {
 
   private Long id;
   private Long clusterId;
@@ -31,6 +31,7 @@ public class HostComponentUpdateEvent implements AmbariUpdateEvent {
   private State currentState;
 
   public HostComponentUpdateEvent(HostComponentStateEntity stateEntity) {
+    super(Type.HOSTCOMPONENT);
     this.id = stateEntity.getId();
     this.clusterId = stateEntity.getClusterId();
     this.serviceName = stateEntity.getServiceName();
@@ -86,9 +87,4 @@ public class HostComponentUpdateEvent implements AmbariUpdateEvent {
   public void setCurrentState(State currentState) {
     this.currentState = currentState;
   }
-
-  @Override
-  public String getDestination() {
-    return "/events/hostcomponents";
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
index 956b73c..fc660a6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ambari.server.events;
 
-public class MetadataUpdateEvent implements AmbariUpdateEvent {
-  @Override
-  public String getDestination() {
-    return "/events/metadata";
+public class MetadataUpdateEvent extends AmbariUpdateEvent {
+
+  public MetadataUpdateEvent() {
+    super(Type.METADATA);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
index 682ce50..a2d2270 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
@@ -24,7 +24,7 @@ import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.topology.TopologyManager;
 
-public class RequestUpdateEvent implements AmbariUpdateEvent {
+public class RequestUpdateEvent extends AmbariUpdateEvent {
 
   private Long clusterId;
   private Long endTime;
@@ -36,6 +36,7 @@ public class RequestUpdateEvent implements AmbariUpdateEvent {
 
 
   public RequestUpdateEvent(RequestEntity requestEntity, HostRoleCommandDAO hostRoleCommandDAO, TopologyManager topologyManager) {
+    super(Type.REQUEST);
     this.clusterId = requestEntity.getClusterId();
     this.endTime = requestEntity.getEndTime();
     this.requestId = requestEntity.getRequestId();
@@ -100,9 +101,4 @@ public class RequestUpdateEvent implements AmbariUpdateEvent {
   public void setStartTime(Long startTime) {
     this.startTime = startTime;
   }
-
-  @Override
-  public String getDestination() {
-    return "/events/requests";
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
index 9655924..49d2f78 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
@@ -23,13 +23,14 @@ import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class TopologyUpdateEvent implements AmbariUpdateEvent {
+public class TopologyUpdateEvent extends AmbariUpdateEvent {
   @JsonProperty("clustersTopologies")
   private Map<String, TopologyCluster> clusters;
 
   private EventType eventType;
 
   public TopologyUpdateEvent(Map<String, TopologyCluster> clusters, EventType eventType) {
+    super(Type.TOPOLOGY);
     this.clusters = clusters;
     this.eventType = eventType;
   }
@@ -50,11 +51,6 @@ public class TopologyUpdateEvent implements AmbariUpdateEvent {
     this.eventType = eventType;
   }
 
-  @Override
-  public String getDestination() {
-    return "/events/topology";
-  }
-
   public enum EventType {
     ADD,
     DELETE,

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index ab8ce8b..09a7a8f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
 import org.apache.ambari.server.metrics.system.MetricsService;
 import org.apache.ambari.server.metrics.system.MetricsSink;
 import org.apache.ambari.server.metrics.system.MetricsSource;
@@ -42,6 +43,8 @@ public class MetricsServiceImpl implements MetricsService {
 
   @Inject
   AmbariManagementController amc;
+  @Inject
+  StateUpdateEventPublisher stateUpdateEventPublisher;
 
   @Override
   public void start() {
@@ -109,6 +112,9 @@ public class MetricsServiceImpl implements MetricsService {
         AbstractMetricsSource src = (AbstractMetricsSource) sourceClass.newInstance();
         src.init(MetricsConfiguration.getSubsetConfiguration(configuration, "source." + sourceName + "."), sink);
         sources.put(sourceName, src);
+        if (src instanceof StompEventsMetricsSource) {
+          stateUpdateEventPublisher.register(src);
+        }
         src.start();
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e64135c/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/StompEventsMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/StompEventsMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/StompEventsMetricsSource.java
new file mode 100644
index 0000000..dfc1484
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/StompEventsMetricsSource.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metrics.system.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.events.AmbariUpdateEvent;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+
+/**
+ * Collects metrics about number of events by types and publishes to configured Metric Sink.
+ */
+public class StompEventsMetricsSource extends AbstractMetricsSource {
+  private static Logger LOG = LoggerFactory.getLogger(StompEventsMetricsSource.class);
+
+  private Map<AmbariUpdateEvent.Type, Long> events = new HashMap<>();
+  private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
+  private final String EVENTS_TOTAL_METRIC = "events.total";
+  private final String AVERAGE_METRIC_SUFFIX = ".avg";
+
+  private int interval = 60;
+
+  @Override
+  public void init(MetricsConfiguration configuration, MetricsSink sink) {
+    super.init(configuration, sink);
+    for (AmbariUpdateEvent.Type type : AmbariUpdateEvent.Type.values()) {
+      events.put(type, 0L);
+    }
+
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Starting stomp events source...");
+    try {
+      executor.scheduleWithFixedDelay(new Runnable() {
+        @Override
+        public void run() {
+          List<SingleMetric> events = getEvents();
+          if (!events.isEmpty()) {
+            sink.publish(events);
+            LOG.debug("********* Published stomp events metrics to sink **********");
+          }
+        }
+      }, interval, interval, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.info("Throwing exception when starting stomp events source", e);
+    }
+  }
+
+  private List<SingleMetric> getEvents() {
+    List<SingleMetric> metrics = new ArrayList<>();
+    Long totalEventsCounter = 0L;
+    synchronized (events) {
+      for (Map.Entry<AmbariUpdateEvent.Type, Long> event : events.entrySet()) {
+        totalEventsCounter += event.getValue();
+        metrics.add(new SingleMetric(event.getKey().getMetricName(), event.getValue(), System.currentTimeMillis()));
+
+        String averageMetricName = event.getKey().getMetricName() + AVERAGE_METRIC_SUFFIX;
+        Double eventsPerSecond = event.getValue() == 0 ? -1 : (double) interval / (double) event.getValue();
+        metrics.add(new SingleMetric(averageMetricName,
+                eventsPerSecond, System.currentTimeMillis()));
+        events.put(event.getKey(), 0L);
+      }
+      metrics.add(new SingleMetric(EVENTS_TOTAL_METRIC, totalEventsCounter, System.currentTimeMillis()));
+
+      String totalAverageMetricName = EVENTS_TOTAL_METRIC + AVERAGE_METRIC_SUFFIX;
+      Double eventsPerSecond = totalEventsCounter == 0 ? -1 : (double) interval / (double) totalEventsCounter;
+      metrics.add(new SingleMetric(totalAverageMetricName,
+              eventsPerSecond, System.currentTimeMillis()));
+    }
+    return metrics;
+  }
+
+  @Subscribe
+  public void onUpdateEvent(AmbariUpdateEvent ambariUpdateEvent) {
+    AmbariUpdateEvent.Type metricType = ambariUpdateEvent.getType();
+    events.put(metricType, events.get(metricType) + 1);
+  }
+}