You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/01/09 22:39:52 UTC
ambari git commit: AMBARI-17596 : Collect & Publish AmbariServer
database metrics. (avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 61ba63f21 -> ec8809cea
AMBARI-17596 : Collect & Publish AmbariServer database metrics. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ec8809ce
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ec8809ce
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ec8809ce
Branch: refs/heads/branch-2.5
Commit: ec8809cea9f175ac8604eb7e3be817150e5fd47a
Parents: 61ba63f
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Jan 9 14:39:41 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Jan 9 14:39:41 2017 -0800
----------------------------------------------------------------------
ambari-server/conf/unix/metrics.properties | 53 ++++-
ambari-server/conf/windows/metrics.properties | 53 ++++-
.../server/metrics/system/MetricsService.java | 15 +-
.../server/metrics/system/MetricsSource.java | 10 +-
.../system/impl/AbstractMetricsSource.java | 12 +-
.../system/impl/AmbariMetricSinkImpl.java | 18 +-
.../system/impl/AmbariPerformanceMonitor.java | 140 +++++++++++++
.../system/impl/DatabaseMetricsSource.java | 201 +++++++++++++++++++
.../metrics/system/impl/JvmMetricsSource.java | 38 +++-
.../system/impl/MetricsConfiguration.java | 38 ++++
.../metrics/system/impl/MetricsServiceImpl.java | 59 +++---
.../system/impl/JvmMetricsSourceTest.java | 105 ----------
.../metric/system/impl/MetricsServiceTest.java | 18 +-
.../metric/system/impl/MetricsSourceTest.java | 171 ++++++++++++++++
.../system/impl/TestAmbariMetricsSinkImpl.java | 1 +
.../metric/system/impl/TestMetricsSource.java | 11 +-
.../src/test/resources/metrics.properties | 5 +-
17 files changed, 747 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/conf/unix/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties
index 3ee22d6..e0ec718 100644
--- a/ambari-server/conf/unix/metrics.properties
+++ b/ambari-server/conf/unix/metrics.properties
@@ -17,12 +17,53 @@
# limitations under the License.
-#### Source Configs #####
-# Source interval determines how often the metric is sent to sink. Its unit is in seconds
-metric.sources=jvm
+#################### Metrics Source Configs #####################
-source.jvm.interval=10
+metric.sources=jvm,database
+#Valid Values : jvm,database
+
+#### JVM Source Configs ###
source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.jvm.interval=10
+
+#### Database Metrics Source Configs ###
+
+# Note : To enable Database metrics source completely, add the following property to ambari.properties as well
+# server.persistence.properties.eclipselink.profiler=org.apache.ambari.server.metrics.system.impl.AmbariPerformanceMonitor
+
+source.database.class=org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource
+
+source.database.performance.monitor.query.weight=HEAVY
+#Valid Values : NONE / NORMAL / HEAVY / ALL
+
+#collection interval in milliseconds
+source.database.monitor.dumptime=60000
+
+# Database Metrics Source filter Configs.
+# Note : Aggregate Query stats (Across all entities) will be tracked by default
+
+# Include entities to be tracked.
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity,ServiceComponentDesiredStateEntity,Alert(.*)Entity,StackEntity,StageEntity
+
+# Include some metrics which have the keyword even if they are not part of requested Entities.
+source.database.monitor.query.keywords.include=CacheMisses
+
+# Some examples of keywords that may be useful to include.
+
+# Query Types
+# ReadAllQuery,ReadObjectQuery,UpdateObjectQuery,ReportQuery,InsertObjectQuery,ValueReadQuery,DataModifyQuery,DoesExistQuery
+
+# Query stages
+# ObjectBuilding,QueryPreparation,SqlPrepare,SqlGeneration,StatementExecute,RowFetch,ConnectCalls,UnitOfWorkCommit,ClientSessionReleases,ConnectionManagement,CacheHits
+
+# Sequences
+# host_role_command_id_seq,alert_history_id_seq
+
+################################################################
+
+############## General Metrics Service Configs #################
+
+#ambariserver.hostname.override=
+
+################################################################
-#Override Ambari Server hostname for metrics
-#ambariserver.hostname.override=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/conf/windows/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/metrics.properties b/ambari-server/conf/windows/metrics.properties
index 3ee22d6..5498858 100644
--- a/ambari-server/conf/windows/metrics.properties
+++ b/ambari-server/conf/windows/metrics.properties
@@ -17,12 +17,53 @@
# limitations under the License.
-#### Source Configs #####
-# Source interval determines how often the metric is sent to sink. Its unit is in seconds
-metric.sources=jvm
+#################### Metrics Source Configs #####################
-source.jvm.interval=10
+metric.sources=jvm,database
+#Valid Values : jvm,database
+
+#### JVM Source Configs ###
source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.jvm.interval=10
+
+#### Database Metrics Source Configs ###
+
+# Note : To enable Database metrics source completely, add the following property to ambari.properties as well
+# server.persistence.properties.eclipselink.profiler=org.apache.ambari.server.metrics.system.impl.AmbariPerformanceMonitor
+
+source.database.class=org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource
+
+source.database.performance.monitor.query.weight=HEAVY
+#Valid Values : NONE / NORMAL / HEAVY / ALL
+
+#collection interval in seconds
+source.database.monitor.dumptime=60000
+
+# Database Metrics Source filter Configs.
+# Note : Aggregate Query stats (Across all entities) will be tracked by default
+
+# Include entities to be tracked.
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity,ServiceComponentDesiredStateEntity,Alert(.*)Entity,StackEntity,StageEntity
+
+# Include some metrics which have the keyword even if they are not part of requested Entities.
+source.database.monitor.query.keywords.include=CacheMisses
+
+# Some examples of keywords that may be useful to include.
+
+# Query Types
+# ReadAllQuery,ReadObjectQuery,UpdateObjectQuery,ReportQuery,InsertObjectQuery,ValueReadQuery,DataModifyQuery,DoesExistQuery
+
+# Query stages
+# ObjectBuilding,QueryPreparation,SqlPrepare,SqlGeneration,StatementExecute,RowFetch,ConnectCalls,UnitOfWorkCommit,ClientSessionReleases,ConnectionManagement,CacheHits
+
+# Sequences
+# host_role_command_id_seq,alert_history_id_seq
+
+################################################################
+
+############## General Metrics Service Configs #################
+
+#ambariserver.hostname.override=
+
+################################################################
-#Override Ambari Server hostname for metrics
-#ambariserver.hostname.override=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
index 4a613f0..82e1589 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
@@ -18,20 +18,13 @@
package org.apache.ambari.server.metrics.system;
-import java.util.Collection;
-
-import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
-
-
+/**
+ * Interface for Metrics Service.
+ */
public interface MetricsService{
/**
- * Set up configuration
+ * Start the service
**/
void start();
- /**
- * Get Configured sources
- * @return
- */
- Collection<AbstractMetricsSource> getSources();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
index 400dcb6..024ec48 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
@@ -20,9 +20,15 @@ package org.apache.ambari.server.metrics.system;
import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
-public interface MetricsSource extends Runnable{
+public interface MetricsSource{
+
/**
- * initialize sink
+ * Initialize the source, set up configs etc.
**/
void init(MetricsConfiguration configuration, MetricsSink sink);
+
+ /**
+ * Start the source
+ **/
+ void start();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
index 6bdd0ba..3178c2b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
@@ -19,24 +19,16 @@ package org.apache.ambari.server.metrics.system.impl;
import org.apache.ambari.server.metrics.system.MetricsSink;
import org.apache.ambari.server.metrics.system.MetricsSource;
-import org.apache.ambari.server.metrics.system.SingleMetric;
-
-import java.util.List;
public abstract class AbstractMetricsSource implements MetricsSource {
protected MetricsSink sink;
+ protected MetricsConfiguration configuration;
/**
* Pass metrics sink to metrics source
**/
@Override
public void init(MetricsConfiguration configuration, MetricsSink sink) {
- this.sink = sink;
+ this.sink = sink;
}
-
- /**
- * Get metrics at the instance
- * @return a map for metrics that maps metrics name to metrics value
- **/
- abstract public List<SingleMetric> getMetrics();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index be24988..358b8fa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.ambari.server.metrics.system.impl;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -55,6 +54,9 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.springframework.security.core.context.SecurityContextHolder;
+/**
+ * Ambari Server Metrics Sink implementation to push collected metrics to AMS.
+ */
public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
private static final String AMBARI_SERVER_APP_ID = "ambari_server";
private Collection<String> collectorHosts;
@@ -82,6 +84,12 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
authenticationToken.setAuthenticated(true);
SecurityContextHolder.getContext().setAuthentication(authenticationToken);
Clusters clusters = ambariManagementController.getClusters();
+
+ if (clusters == null || clusters.getClusters().isEmpty()) {
+ LOG.info("No clusters configured.");
+ return;
+ }
+
String ambariMetricsServiceName = "AMBARI_METRICS";
collectorHosts = new HashSet<>();
@@ -146,8 +154,9 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
}
}
- collectorUri = getCollectorUri(findPreferredCollectHost());
hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName());
+ LOG.info("Hostname used for ambari server metrics : " + hostName);
+ collectorUri = getCollectorUri(findPreferredCollectHost());
int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
@@ -170,10 +179,13 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
return null;
}
+ /**
+ * Publish metrics to AMS.
+ * @param metrics Set of metrics
+ */
@Override
public void publish(List<SingleMetric> metrics) {
-
//If Sink not yet initialized, drop the metrics on the floor.
if (isInitialized) {
List<TimelineMetric> metricList = getFilteredMetricList(metrics);
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
new file mode 100644
index 0000000..236d8ff
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system.impl;
+
+import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.persistence.sessions.SessionProfiler;
+import org.eclipse.persistence.tools.profiler.PerformanceMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Class to extend the EclipseLink PerformanceMonitor, and dump the collected metrics to the AmbariServer Database Metrics source.
+ */
+@Singleton
+public class AmbariPerformanceMonitor extends PerformanceMonitor {
+ private static Logger LOG = LoggerFactory.getLogger(AmbariPerformanceMonitor.class);
+ private boolean isInitialized = false;
+ private DatabaseMetricsSource metricsSource;
+ private static String entityPackagePrefix = "org.apache"; //Can be made into a set later if needed.
+
+ public AmbariPerformanceMonitor() {
+ super();
+ LOG.info("AmbariPerformanceMonitor instantiated");
+ init();
+ }
+
+ private void init() {
+
+ if (metricsSource == null) {
+ metricsSource = (DatabaseMetricsSource) MetricsServiceImpl.getSource("database");
+ }
+
+ if (metricsSource != null) {
+ LOG.info("AmbariPerformanceMonitor initialized");
+
+ long interval = Long.parseLong(metricsSource.getConfigurationValue("dumptime", "60000"));
+ this.setDumpTime(interval);
+
+ String profileWeight = metricsSource.getConfigurationValue("query.weight", "HEAVY");
+ this.setProfileWeight(getWeight(profileWeight));
+
+ isInitialized = true;
+
+ } else {
+ LOG.info("AmbariPerformanceMonitor not yet initialized.");
+ }
+ }
+
+ /**
+ * Overridden dump metrics method for dumping Metrics to source rather than writing to Log file.
+ */
+ @Override
+ public void dumpResults() {
+
+ lastDumpTime = System.currentTimeMillis();
+ Set<String> operations = new TreeSet<String>(this.operationTimings.keySet());
+ Map<String, Long> metrics = new HashMap<>();
+
+ for (String operation : operations) {
+
+ String[] splits = operation.split(":");
+
+ Object value = this.operationTimings.get(operation);
+ if (value == null) {
+ value = Long.valueOf(0);
+ }
+ //Cleaning up metric names.
+ if (value instanceof Long) {
+ List<String> list = new ArrayList<>();
+ for (int i = 0; i < splits.length; i++) {
+ //Removing full package paths from Entity names
+ if (splits[i].startsWith(entityPackagePrefix)) {
+ String[] queryClassSplits = splits[i].split("\\.");
+ list.add(queryClassSplits[queryClassSplits.length - 1]);
+ } else if (splits[i] != null && !splits[i].equals("null")) {
+ //Removing nulls in metric names.
+ list.add(splits[i]);
+ }
+ }
+ //Joining metric name portions by "." delimiter.
+ metrics.put(StringUtils.join(list, "."), (Long)value);
+ }
+ }
+ if (!metrics.isEmpty()) {
+ if (!isInitialized) {
+ init();
+ }
+ if (isInitialized) {
+ LOG.debug("Publishing " + metrics.size() + " metrics to sink.");
+ metricsSource.publish(metrics);
+ }
+ }
+ }
+
+ /**
+ * Utlity method to get Profiling weight in Integer from String.
+ * @param value NONE/HEAVY/ALL/NORMAL
+ * @return SessionProfiler.HEAVY/NONE/ALL/NORMAL
+ */
+ private int getWeight(String value) {
+
+ if (StringUtils.isEmpty(value) || value.equals("NONE")) {
+ return SessionProfiler.NONE;
+ }
+
+ if (value.equals("ALL")) {
+ return SessionProfiler.ALL;
+ }
+
+ if (value.equals("NORMAL")) {
+ return SessionProfiler.NORMAL;
+ }
+
+ return SessionProfiler.HEAVY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
new file mode 100644
index 0000000..da5ad70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metrics.system.impl;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @{link DatabaseMetricsSource} collects database metrics which is generated through Eclipselink PerformanceMonitor,
+ * and publishes to configured Metric Sink.
+ **/
+public class DatabaseMetricsSource extends AbstractMetricsSource {
+ private static Logger LOG = LoggerFactory.getLogger(DatabaseMetricsSource.class);
+ private static String dbMonitorPrefix = "monitor.";
+ private ExecutorService executor;
+ private MetricsConfiguration configuration;
+ private Set<String> includedMetricKeywords = new HashSet<>();
+ private Set<Pattern> acceptedEntityPatterns = new HashSet<>();
+ private Set<String> acceptedEntities = new HashSet<>();
+ private static String TIMER = "Timer.";
+ private static String COUNTER = "Counter.";
+
+ @Override
+ public void init(MetricsConfiguration metricsConfig, MetricsSink sink) {
+ super.init(metricsConfig, sink);
+ configuration = metricsConfig;
+ initializeFilterSets();
+ LOG.info("DatabaseMetricsSource initialized.");
+ }
+
+ /**
+ * Initialize filter sets (Entities and keywords) to know which metrics to track vs drop.
+ */
+ private void initializeFilterSets() {
+
+ String commaSeparatedValues = configuration.getProperty(dbMonitorPrefix + "query.keywords.include");
+ if (StringUtils.isNotEmpty((commaSeparatedValues))) {
+ includedMetricKeywords.addAll(Arrays.asList(commaSeparatedValues.split(",")));
+ }
+
+ commaSeparatedValues = configuration.getProperty(dbMonitorPrefix + "entities");
+ if (StringUtils.isNotEmpty((commaSeparatedValues))) {
+ String[] entityPatterns = (commaSeparatedValues.split(","));
+ for (String pattern : entityPatterns) {
+ acceptedEntityPatterns.add(Pattern.compile(pattern));
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting Database Metrics source...");
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("DatabaseMetricsSource-%d")
+ .build();
+ executor = Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ /**
+ * Method to publish metrics to Sink asynchronously.
+ * @param metricsMap Map of metrics to be published to Sink
+ */
+ public void publish(final Map<String, Long> metricsMap) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+
+ for (Iterator<Map.Entry<String, Long>> it = metricsMap.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> metricEntry = it.next();
+ String metricName = metricEntry.getKey();
+ if (!acceptMetric(metricName)) {
+ it.remove();
+ }
+ }
+ final List<SingleMetric> metrics = new ArrayList<>();
+ for (String metricName : metricsMap.keySet()) {
+ double value = metricsMap.get(metricName).doubleValue();
+ metrics.add(new SingleMetric(metricName, value, currentTime));
+
+ /**
+ * Add computed (Timer/Counter) metric.
+ * Example
+ * Counter Metric : Counter.ReadAllQuery.HostRoleCommandEntity = 10000
+ * Timer Metric : Timer.ReadAllQuery.HostRoleCommandEntity = 50
+ * Computed Metric (Avg time for the operation) : ReadAllQuery.HostRoleCommandEntity = 200 (10000 div by 50)
+ */
+
+ if (metricName.startsWith(COUNTER)) {
+ String baseMetricName = metricName.substring(COUNTER.length());
+ if (metricsMap.containsKey(TIMER + baseMetricName)) {
+ double timerValue = metricsMap.get(TIMER + baseMetricName).doubleValue();
+ if (value != 0.0) {
+ metrics.add(new SingleMetric(baseMetricName, timerValue / value , currentTime));
+ }
+ }
+ }
+ }
+ sink.publish(metrics);
+ }
+ });
+ } catch (Exception e) {
+ LOG.info("Exception when publishing Database metrics to sink", e);
+ }
+ }
+
+ /**
+ * Accept a metric to be passed to Sink or not.
+ * @param metricName
+ * @return true/false
+ */
+ public boolean acceptMetric(String metricName) {
+
+ boolean accept = false;
+
+ /*
+ Include entities to be tracked.
+ source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity
+ */
+ if (acceptedEntities.contains(metricName)) {
+ accept = true;
+ } else {
+ for (Pattern p : acceptedEntityPatterns) {
+ Matcher m = p.matcher(metricName);
+ if (m.find()) {
+ accept = true;
+ }
+ }
+ }
+
+ /*
+ Include some metrics which have the keyword even if they are not part of requested Entities.
+ source.database.monitor.query.keywords.include=CacheMisses
+ */
+
+ for (String keyword : includedMetricKeywords) {
+ if (metricName.contains(keyword)) {
+ accept = true;
+ }
+ }
+
+ String[] splits = metricName.split("\\.");
+ if (splits.length <= 2) {
+ accept = true; //Aggregate Counter metrics are always ok. They are not Entity specific
+ }
+
+ if (accept) {
+ acceptedEntities.add(metricName);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Method to get Configuration value given key. An extra prefix is added internally.
+ * @param key
+ * @param defaultValue
+ * @return Value corresponding to key = dbMonitorPrefix + key
+ */
+ public String getConfigurationValue(String key, String defaultValue) {
+ return this.configuration.getProperty(dbMonitorPrefix + key, defaultValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
index cb9f275..dbb51cf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
@@ -21,6 +21,9 @@ import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.metrics.system.MetricsSink;
import org.apache.ambari.server.metrics.system.SingleMetric;
@@ -38,24 +41,42 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+/**
+ * @{link JvmMetricsSource} collects JVM Metrics using codahale and publish to Metrics Sink.
+ */
public class JvmMetricsSource extends AbstractMetricsSource {
static final MetricRegistry registry = new MetricRegistry();
private static Logger LOG = LoggerFactory.getLogger(JvmMetricsSource.class);
+ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ private static String JVM_PREFIX = "jvm";
+ private int interval = 10;
@Override
public void init(MetricsConfiguration configuration, MetricsSink sink) {
super.init(configuration, sink);
- registerAll("jvm.gc", new GarbageCollectorMetricSet(), registry);
- registerAll("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
- registerAll("jvm.memory", new MemoryUsageGaugeSet(), registry);
- registerAll("jvm.threads", new ThreadStatesGaugeSet(), registry);
- registry.register("jvm.file.open.descriptor.ratio", new FileDescriptorRatioGauge());
+ registerAll(JVM_PREFIX + ".gc", new GarbageCollectorMetricSet(), registry);
+ registerAll(JVM_PREFIX + ".buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
+ registerAll(JVM_PREFIX + ".memory", new MemoryUsageGaugeSet(), registry);
+ registerAll(JVM_PREFIX + ".threads", new ThreadStatesGaugeSet(), registry);
+ registry.register(JVM_PREFIX + ".file.open.descriptor.ratio", new FileDescriptorRatioGauge());
+ interval = Integer.parseInt(configuration.getProperty("interval", "10"));
+ LOG.info("JVM Metrics source initialized.");
}
@Override
- public void run() {
- sink.publish(getMetrics());
- LOG.debug("********* Published system metrics to sink **********");
+ public void start() {
+ LOG.info("Starting JVM Metrics source...");
+ try {
+ executor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ sink.publish(getMetrics());
+ LOG.debug("********* Published JVM metrics to sink **********");
+ }
+ }, interval, interval, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.info("Throwing exception when starting metric source", e);
+ }
}
private void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
@@ -68,7 +89,6 @@ public class JvmMetricsSource extends AbstractMetricsSource {
}
}
- @Override
public List<SingleMetric> getMetrics() {
List<SingleMetric> metrics = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
index ca83a53..540272d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
@@ -20,8 +20,10 @@ package org.apache.ambari.server.metrics.system.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Map;
import java.util.Properties;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,4 +88,40 @@ public class MetricsConfiguration {
public String getProperty(String key, String defaultValue) {
return properties.getProperty(key, defaultValue);
}
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Get
+ *
+ * @param metricsConfiguration
+ * @param prefix
+ * @return subset configuration which contains the Key-Value pairs whose keys start with the passed in prefix.
+ */
+ public static MetricsConfiguration getSubsetConfiguration(MetricsConfiguration metricsConfiguration, String prefix) {
+
+ if (null == metricsConfiguration) {
+ return null;
+ }
+
+ Properties properties = metricsConfiguration.getProperties();
+ if (null == properties || StringUtils.isEmpty(prefix)) {
+ return new MetricsConfiguration(properties);
+ }
+
+ Properties subsetProperties = new Properties();
+
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ String key = entry.getKey().toString();
+ String val = entry.getValue().toString();
+ if (key.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+ subsetProperties.put(key, val);
+ }
+ }
+
+ return new MetricsConfiguration(subsetProperties);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index d0d2e69..d7d905c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,16 +17,15 @@
*/
package org.apache.ambari.server.metrics.system.impl;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.metrics.system.MetricsService;
import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.MetricsSource;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +36,8 @@ import com.google.inject.Singleton;
@Singleton
public class MetricsServiceImpl implements MetricsService {
private static Logger LOG = LoggerFactory.getLogger(MetricsServiceImpl.class);
- private Map<String, AbstractMetricsSource> sources = new HashMap<>();
- private MetricsSink sink = null;
+ private static Map<String, MetricsSource> sources = new HashMap<>();
+ private static MetricsSink sink = null;
private MetricsConfiguration configuration = null;
@Inject
@@ -57,7 +56,8 @@ public class MetricsServiceImpl implements MetricsService {
initializeMetricSources();
if (!sink.isInitialized()) {
- //If Sink is not initialized, Service will check for every 5 mins.
+ // If Sink is not initialized (say, cluster has not yet been deployed or AMS had not been installed)
+ // Service will check for every 5 mins.
Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
@@ -86,41 +86,42 @@ public class MetricsServiceImpl implements MetricsService {
String commaSeparatedSources = configuration.getProperty("metric.sources");
if (StringUtils.isEmpty(commaSeparatedSources)) {
- LOG.info("No sources configured.");
+ LOG.info("No metric sources configured.");
return;
}
String[] sourceNames = commaSeparatedSources.split(",");
- for (String sourceName: sourceNames) {
+ for (String sourceName : sourceNames) {
+
+ if (StringUtils.isEmpty(sourceName)) {
+ continue;
+ }
+ sourceName = sourceName.trim();
+
String className = configuration.getProperty("source." + sourceName + ".class");
- Class t = Class.forName(className);
- AbstractMetricsSource src = (AbstractMetricsSource)t.newInstance();
- src.init(configuration, sink);
+ Class sourceClass;
+ try {
+ sourceClass = Class.forName(className);
+ } catch (ClassNotFoundException ex) {
+ LOG.info("Source class not found for source name :" + sourceName);
+ continue;
+ }
+ AbstractMetricsSource src = (AbstractMetricsSource) sourceClass.newInstance();
+ src.init(MetricsConfiguration.getSubsetConfiguration(configuration, "source." + sourceName + "."), sink);
sources.put(sourceName, src);
+ src.start();
}
- final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
- startSource(executor, entry);
- }
} catch (Exception e) {
LOG.error("Error when configuring metric sink and source", e);
}
}
- private void startSource(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
- String className = entry.getKey();
- AbstractMetricsSource source = entry.getValue();
- String interval = "source." + className + ".interval";
- int duration = Integer.parseInt(configuration.getProperty(interval, "10")); // default value 10 seconds
- try {
- executor.scheduleWithFixedDelay(source, 0, duration, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.info("Throwing exception when starting metric source", e);
- }
+ public static MetricsSource getSource(String type) {
+ return sources.get(type);
}
- public Collection<AbstractMetricsSource> getSources() {
- return sources.values();
+ public static MetricsSink getSink() {
+ return sink;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
deleted file mode 100644
index 84b2df4..0000000
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.metric.system.impl;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.ambari.server.metrics.system.MetricsSink;
-import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
-import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class JvmMetricsSourceTest {
-
- @Test
- public void testJvmSourceInit_PreJVM1_8() {
- Assume.assumeThat(System.getProperty("java.version"), new LessThanVersionMatcher("1.8"));
- testJvmSourceInit(39);
- }
-
- @Test
- public void testJvmSourceInit_JVM1_8() {
- Assume.assumeThat(System.getProperty("java.version"), new VersionMatcher("1.8"));
- testJvmSourceInit(40);
- }
-
- private void testJvmSourceInit(int metricsSize) {
- JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
- MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
- MetricsSink sink = new TestAmbariMetricsSinkImpl();
- jvmMetricsSource.init(configuration, sink);
- org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), metricsSize);
- }
-
- /* ****************************************************************
- * Matcher classes used in Assume checks
- * **************************************************************** */
- private class VersionMatcher extends BaseMatcher<String> {
- private final float version;
-
- VersionMatcher(String version) {
- this.version = Float.parseFloat(version);
- }
-
- @Override
- public boolean matches(Object o) {
- return parseVersion((String) o) == this.version;
- }
-
- float parseVersion(String versionString) {
- Pattern p = Pattern.compile("(\\d+(?:\\.\\d+)).*");
- Matcher matcher = p.matcher(versionString);
- if (matcher.matches()) {
- return Float.parseFloat(matcher.group(1));
- } else {
- return 0f;
- }
- }
-
- @Override
- public void describeTo(Description description) {
-
- }
-
- public float getVersion() {
- return version;
- }
- }
-
- private class LessThanVersionMatcher extends VersionMatcher {
-
- LessThanVersionMatcher(String version) {
- super(version);
- }
-
- @Override
- public boolean matches(Object o) {
- return parseVersion((String) o) < getVersion();
- }
-
- @Override
- public void describeTo(Description description) {
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
index 4029f25..7344f55 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
@@ -18,8 +18,7 @@
package org.apache.ambari.server.metric.system.impl;
-import org.apache.ambari.server.metrics.system.MetricsService;
-import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+import org.apache.ambari.server.metrics.system.MetricsSource;
import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl;
import org.junit.Test;
@@ -30,11 +29,16 @@ public class MetricsServiceTest {
@Test
public void testMetricsServiceStart() {
- MetricsService metricsService = new MetricsServiceImpl();
+ MetricsServiceImpl metricsService = new MetricsServiceImpl();
metricsService.start();
- Assert.assertTrue(metricsService.getSources().size() == 2);
- for (AbstractMetricsSource source : metricsService.getSources()) {
- Assert.assertTrue ( source instanceof JvmMetricsSource || source instanceof TestMetricsSource);
- }
+
+ MetricsSource source = MetricsServiceImpl.getSource("jvm");
+ Assert.assertNotNull(source);
+ Assert.assertTrue(source instanceof JvmMetricsSource);
+
+ source = MetricsServiceImpl.getSource("testsource");
+ Assert.assertNotNull(source);
+ Assert.assertTrue(source instanceof TestMetricsSource);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
new file mode 100644
index 0000000..067d4fb
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.AmbariMetricSinkImpl;
+import org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import junit.framework.Assert;
+
+@RunWith(EasyMockRunner.class)
+public class MetricsSourceTest {
+
+ @Test
+ public void testJvmSourceInit_PreJVM1_8() {
+ Assume.assumeThat(System.getProperty("java.version"), new LessThanVersionMatcher("1.8"));
+ testJvmSourceInit(39);
+ }
+
+ @Test
+ public void testJvmSourceInit_JVM1_8() {
+ Assume.assumeThat(System.getProperty("java.version"), new VersionMatcher("1.8"));
+ testJvmSourceInit(40);
+ }
+
+ private void testJvmSourceInit(int metricsSize) {
+ JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
+ MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
+ MetricsSink sink = new TestAmbariMetricsSinkImpl();
+ jvmMetricsSource.init(configuration, sink);
+ org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), metricsSize);
+ }
+
+ /* ****************************************************************
+ * Matcher classes used in Assume checks
+ * **************************************************************** */
+ private class VersionMatcher extends BaseMatcher<String> {
+ private final float version;
+
+ VersionMatcher(String version) {
+ this.version = Float.parseFloat(version);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ return parseVersion((String) o) == this.version;
+ }
+
+ float parseVersion(String versionString) {
+ Pattern p = Pattern.compile("(\\d+(?:\\.\\d+)).*");
+ Matcher matcher = p.matcher(versionString);
+ if (matcher.matches()) {
+ return Float.parseFloat(matcher.group(1));
+ } else {
+ return 0f;
+ }
+ }
+
+ @Override
+ public void describeTo(Description description) {
+
+ }
+
+ public float getVersion() {
+ return version;
+ }
+ }
+
+ private class LessThanVersionMatcher extends VersionMatcher {
+
+ LessThanVersionMatcher(String version) {
+ super(version);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ return parseVersion((String) o) < getVersion();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testDatabaseMetricSourcePublish() throws InterruptedException {
+ Map<String, Long> metricsMap = new HashMap<>();
+
+ metricsMap.put("Timer.UpdateObjectQuery.HostRoleCommandEntity", 10000l); // Should be accepted.
+ metricsMap.put("Timer.UpdateObjectQuery.HostRoleCommandEntity.SqlPrepare", 5000l); // Should be accepted.
+ metricsMap.put("Timer.DirectReadQuery", 6000l); // Should be accepted.
+ metricsMap.put("Timer.ReadAllQuery.StackEntity.StackEntity.findByNameAndVersion.SqlPrepare", 15000l); //Should be discarded
+
+ metricsMap.put("Counter.UpdateObjectQuery.HostRoleCommandEntity", 10l); // Should be accepted & should add a computed metric.
+ metricsMap.put("Counter.ReadObjectQuery.RequestEntity.request", 4330l); //Should be discarded
+ metricsMap.put("Counter.ReadObjectQuery.MetainfoEntity.readMetainfoEntity.CacheMisses", 15l); // Should be accepted.
+
+ DatabaseMetricsSource source = new DatabaseMetricsSource();
+
+ MetricsConfiguration configuration = MetricsConfiguration.getSubsetConfiguration(
+ MetricsConfiguration.getMetricsConfiguration(), "source.database.");
+
+ MetricsSink sink = EasyMock.createMock(AmbariMetricSinkImpl.class);
+ Capture<List<SingleMetric>> metricsCapture = EasyMock.newCapture();
+ sink.publish(capture(metricsCapture));
+ expectLastCall().once();
+
+ replay(sink);
+ source.init(configuration, sink);
+ source.start();
+ source.publish(metricsMap);
+ Thread.sleep(5000l);
+ verify(sink);
+
+ Assert.assertTrue(metricsCapture.getValue().size() == 6);
+ }
+
+ @Test
+ public void testDatabaseMetricsSourceAcceptMetric() {
+
+ DatabaseMetricsSource source = new DatabaseMetricsSource();
+ MetricsConfiguration configuration = MetricsConfiguration.getSubsetConfiguration(
+ MetricsConfiguration.getMetricsConfiguration(), "source.database.");
+ MetricsSink sink = new TestAmbariMetricsSinkImpl();
+ source.init(configuration, sink);
+
+ Assert.assertTrue(source.acceptMetric("Timer.UpdateObjectQuery.HostRoleCommandEntity.SqlPrepare"));
+ Assert.assertFalse(source.acceptMetric("Counter.ReadObjectQuery.RequestEntity.request"));
+ Assert.assertTrue(source.acceptMetric("Counter.ReadObjectQuery.MetainfoEntity.readMetainfoEntity.CacheMisses"));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
index 3565504..36d8fee 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -30,6 +30,7 @@ public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink imple
@Override
public void publish(List<SingleMetric> metrics) {
+ LOG.info("Published " + metrics.size() + " metrics.");
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
index acf1586..5f43b07 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
@@ -18,20 +18,11 @@
package org.apache.ambari.server.metric.system.impl;
-import java.util.List;
-
-import org.apache.ambari.server.metrics.system.SingleMetric;
import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
public class TestMetricsSource extends AbstractMetricsSource {
@Override
- public List<SingleMetric> getMetrics() {
- return null;
+ public void start() {
}
-
- @Override
- public void run() {
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/resources/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/metrics.properties b/ambari-server/src/test/resources/metrics.properties
index 5eee064..d0ea1f3 100644
--- a/ambari-server/src/test/resources/metrics.properties
+++ b/ambari-server/src/test/resources/metrics.properties
@@ -21,9 +21,8 @@
# Source interval determines how often the metric is sent to sink. Its unit is in seconds
metric.sources=jvm,testsource
-source.jvm.interval=10
source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
source.testsource.class=org.apache.ambari.server.metric.system.impl.TestMetricsSource
-#### Sink Configs #####
-# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
\ No newline at end of file
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity
+source.database.monitor.query.keywords.include=CacheMisses
\ No newline at end of file