You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/01/09 22:39:52 UTC

ambari git commit: AMBARI-17596 : Collect & Publish AmbariServer database metrics. (avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 61ba63f21 -> ec8809cea


AMBARI-17596 : Collect & Publish AmbariServer database metrics. (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ec8809ce
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ec8809ce
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ec8809ce

Branch: refs/heads/branch-2.5
Commit: ec8809cea9f175ac8604eb7e3be817150e5fd47a
Parents: 61ba63f
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Jan 9 14:39:41 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Jan 9 14:39:41 2017 -0800

----------------------------------------------------------------------
 ambari-server/conf/unix/metrics.properties      |  53 ++++-
 ambari-server/conf/windows/metrics.properties   |  53 ++++-
 .../server/metrics/system/MetricsService.java   |  15 +-
 .../server/metrics/system/MetricsSource.java    |  10 +-
 .../system/impl/AbstractMetricsSource.java      |  12 +-
 .../system/impl/AmbariMetricSinkImpl.java       |  18 +-
 .../system/impl/AmbariPerformanceMonitor.java   | 140 +++++++++++++
 .../system/impl/DatabaseMetricsSource.java      | 201 +++++++++++++++++++
 .../metrics/system/impl/JvmMetricsSource.java   |  38 +++-
 .../system/impl/MetricsConfiguration.java       |  38 ++++
 .../metrics/system/impl/MetricsServiceImpl.java |  59 +++---
 .../system/impl/JvmMetricsSourceTest.java       | 105 ----------
 .../metric/system/impl/MetricsServiceTest.java  |  18 +-
 .../metric/system/impl/MetricsSourceTest.java   | 171 ++++++++++++++++
 .../system/impl/TestAmbariMetricsSinkImpl.java  |   1 +
 .../metric/system/impl/TestMetricsSource.java   |  11 +-
 .../src/test/resources/metrics.properties       |   5 +-
 17 files changed, 747 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/conf/unix/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties
index 3ee22d6..e0ec718 100644
--- a/ambari-server/conf/unix/metrics.properties
+++ b/ambari-server/conf/unix/metrics.properties
@@ -17,12 +17,53 @@
 # limitations under the License.
 
 
-#### Source Configs #####
-# Source interval determines how often the metric is sent to sink. Its unit is in seconds
-metric.sources=jvm
+#################### Metrics Source Configs #####################
 
-source.jvm.interval=10
+metric.sources=jvm,database
+#Valid Values : jvm,database
+
+#### JVM Source Configs ###
 source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.jvm.interval=10
+
+#### Database Metrics Source Configs ###
+
+# Note : To enable Database metrics source completely, add the following property to ambari.properties as well
+# server.persistence.properties.eclipselink.profiler=org.apache.ambari.server.metrics.system.impl.AmbariPerformanceMonitor
+
+source.database.class=org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource
+
+source.database.performance.monitor.query.weight=HEAVY
+#Valid Values : NONE / NORMAL / HEAVY / ALL
+
+#collection interval in milliseconds
+source.database.monitor.dumptime=60000
+
+# Database Metrics Source filter Configs.
+# Note : Aggregate Query stats (Across all entities) will be tracked by default
+
+# Include entities to be tracked.
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity,ServiceComponentDesiredStateEntity,Alert(.*)Entity,StackEntity,StageEntity
+
+# Include some metrics which have the keyword even if they are not part of requested Entities.
+source.database.monitor.query.keywords.include=CacheMisses
+
+# Some examples of keywords that may be useful to include.
+
+# Query Types
+# ReadAllQuery,ReadObjectQuery,UpdateObjectQuery,ReportQuery,InsertObjectQuery,ValueReadQuery,DataModifyQuery,DoesExistQuery
+
+# Query stages
+# ObjectBuilding,QueryPreparation,SqlPrepare,SqlGeneration,StatementExecute,RowFetch,ConnectCalls,UnitOfWorkCommit,ClientSessionReleases,ConnectionManagement,CacheHits
+
+# Sequences
+# host_role_command_id_seq,alert_history_id_seq
+
+################################################################
+
+############## General Metrics Service Configs #################
+
+#ambariserver.hostname.override=
+
+################################################################
 
-#Override Ambari Server hostname for metrics
-#ambariserver.hostname.override=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/conf/windows/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/metrics.properties b/ambari-server/conf/windows/metrics.properties
index 3ee22d6..5498858 100644
--- a/ambari-server/conf/windows/metrics.properties
+++ b/ambari-server/conf/windows/metrics.properties
@@ -17,12 +17,53 @@
 # limitations under the License.
 
 
-#### Source Configs #####
-# Source interval determines how often the metric is sent to sink. Its unit is in seconds
-metric.sources=jvm
+#################### Metrics Source Configs #####################
 
-source.jvm.interval=10
+metric.sources=jvm,database
+#Valid Values : jvm,database
+
+#### JVM Source Configs ###
 source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.jvm.interval=10
+
+#### Database Metrics Source Configs ###
+
+# Note : To enable Database metrics source completely, add the following property to ambari.properties as well
+# server.persistence.properties.eclipselink.profiler=org.apache.ambari.server.metrics.system.impl.AmbariPerformanceMonitor
+
+source.database.class=org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource
+
+source.database.performance.monitor.query.weight=HEAVY
+#Valid Values : NONE / NORMAL / HEAVY / ALL
+
+#collection interval in seconds
+source.database.monitor.dumptime=60000
+
+# Database Metrics Source filter Configs.
+# Note : Aggregate Query stats (Across all entities) will be tracked by default
+
+# Include entities to be tracked.
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity,ServiceComponentDesiredStateEntity,Alert(.*)Entity,StackEntity,StageEntity
+
+# Include some metrics which have the keyword even if they are not part of requested Entities.
+source.database.monitor.query.keywords.include=CacheMisses
+
+# Some examples of keywords that may be useful to include.
+
+# Query Types
+# ReadAllQuery,ReadObjectQuery,UpdateObjectQuery,ReportQuery,InsertObjectQuery,ValueReadQuery,DataModifyQuery,DoesExistQuery
+
+# Query stages
+# ObjectBuilding,QueryPreparation,SqlPrepare,SqlGeneration,StatementExecute,RowFetch,ConnectCalls,UnitOfWorkCommit,ClientSessionReleases,ConnectionManagement,CacheHits
+
+# Sequences
+# host_role_command_id_seq,alert_history_id_seq
+
+################################################################
+
+############## General Metrics Service Configs #################
+
+#ambariserver.hostname.override=
+
+################################################################
 
-#Override Ambari Server hostname for metrics
-#ambariserver.hostname.override=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
index 4a613f0..82e1589 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
@@ -18,20 +18,13 @@
 
 package org.apache.ambari.server.metrics.system;
 
-import java.util.Collection;
-
-import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
-
-
+/**
+ * Interface for Metrics Service.
+ */
 public interface MetricsService{
   /**
-   * Set up configuration
+   * Start the service
    **/
   void start();
 
-  /**
-   * Get Configured sources
-   * @return
-   */
-  Collection<AbstractMetricsSource> getSources();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
index 400dcb6..024ec48 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
@@ -20,9 +20,15 @@ package org.apache.ambari.server.metrics.system;
 
 import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
 
-public interface MetricsSource extends Runnable{
+public interface MetricsSource{
+
   /**
-   * initialize sink
+   * Initialize the source, set up configs etc.
    **/
   void init(MetricsConfiguration configuration, MetricsSink sink);
+
+  /**
+   * Start the source
+   **/
+  void start();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
index 6bdd0ba..3178c2b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
@@ -19,24 +19,16 @@ package org.apache.ambari.server.metrics.system.impl;
 
 import org.apache.ambari.server.metrics.system.MetricsSink;
 import org.apache.ambari.server.metrics.system.MetricsSource;
-import org.apache.ambari.server.metrics.system.SingleMetric;
-
-import java.util.List;
 
 public abstract class AbstractMetricsSource implements MetricsSource {
   protected MetricsSink sink;
+  protected MetricsConfiguration configuration;
 
   /**
    *  Pass metrics sink to metrics source
    **/
   @Override
   public void init(MetricsConfiguration configuration, MetricsSink sink) {
-      this.sink = sink;
+    this.sink = sink;
   }
-
-  /**
-   *  Get metrics at the instance
-   *  @return a map for metrics that maps metrics name to metrics value
-   **/
-  abstract public List<SingleMetric> getMetrics();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index be24988..358b8fa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.metrics.system.impl;
 
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -55,6 +54,9 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.springframework.security.core.context.SecurityContextHolder;
 
+/**
+ * Ambari Server Metrics Sink implementation to push collected metrics to AMS.
+ */
 public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
   private static final String AMBARI_SERVER_APP_ID = "ambari_server";
   private Collection<String> collectorHosts;
@@ -82,6 +84,12 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
     authenticationToken.setAuthenticated(true);
     SecurityContextHolder.getContext().setAuthentication(authenticationToken);
     Clusters clusters = ambariManagementController.getClusters();
+
+    if (clusters == null || clusters.getClusters().isEmpty()) {
+      LOG.info("No clusters configured.");
+      return;
+    }
+
     String ambariMetricsServiceName = "AMBARI_METRICS";
     collectorHosts = new HashSet<>();
 
@@ -146,8 +154,9 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
       }
     }
 
-    collectorUri = getCollectorUri(findPreferredCollectHost());
     hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName());
+    LOG.info("Hostname used for ambari server metrics : " + hostName);
+    collectorUri = getCollectorUri(findPreferredCollectHost());
 
     int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
       String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
@@ -170,10 +179,13 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
     return null;
   }
 
+  /**
+   * Publish metrics to AMS.
+   * @param metrics Set of metrics
+   */
   @Override
   public void publish(List<SingleMetric> metrics) {
 
-
     //If Sink not yet initialized, drop the metrics on the floor.
     if (isInitialized) {
       List<TimelineMetric> metricList = getFilteredMetricList(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
new file mode 100644
index 0000000..236d8ff
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariPerformanceMonitor.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system.impl;
+
+import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.persistence.sessions.SessionProfiler;
+import org.eclipse.persistence.tools.profiler.PerformanceMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Class to extend the EclipseLink PerformanceMonitor, and dump the collected metrics to the AmbariServer Database Metrics source.
+ */
+@Singleton
+public class AmbariPerformanceMonitor extends PerformanceMonitor {
+  private static Logger LOG = LoggerFactory.getLogger(AmbariPerformanceMonitor.class);
+  private boolean isInitialized = false;
+  private DatabaseMetricsSource metricsSource;
+  private static String entityPackagePrefix = "org.apache"; //Can be made into a set later if needed.
+
+  public AmbariPerformanceMonitor() {
+    super();
+    LOG.info("AmbariPerformanceMonitor instantiated");
+    init();
+  }
+
+   private void init() {
+
+    if (metricsSource == null) {
+      metricsSource = (DatabaseMetricsSource) MetricsServiceImpl.getSource("database");
+    }
+
+    if (metricsSource != null) {
+      LOG.info("AmbariPerformanceMonitor initialized");
+
+      long interval = Long.parseLong(metricsSource.getConfigurationValue("dumptime", "60000"));
+      this.setDumpTime(interval);
+
+      String profileWeight = metricsSource.getConfigurationValue("query.weight", "HEAVY");
+      this.setProfileWeight(getWeight(profileWeight));
+
+      isInitialized = true;
+
+    } else {
+      LOG.info("AmbariPerformanceMonitor not yet initialized.");
+    }
+  }
+
+  /**
+   * Overridden dump metrics method for dumping Metrics to source rather than writing to Log file.
+   */
+  @Override
+  public void dumpResults() {
+
+    lastDumpTime = System.currentTimeMillis();
+    Set<String> operations = new TreeSet<String>(this.operationTimings.keySet());
+    Map<String, Long> metrics = new HashMap<>();
+
+    for (String operation : operations) {
+
+      String[] splits = operation.split(":");
+
+      Object value = this.operationTimings.get(operation);
+      if (value == null) {
+        value = Long.valueOf(0);
+      }
+      //Cleaning up metric names.
+      if (value instanceof Long) {
+        List<String> list = new ArrayList<>();
+        for (int i = 0; i < splits.length; i++) {
+          //Removing full package paths from Entity names
+          if (splits[i].startsWith(entityPackagePrefix)) {
+            String[] queryClassSplits = splits[i].split("\\.");
+            list.add(queryClassSplits[queryClassSplits.length - 1]);
+          } else if (splits[i] != null && !splits[i].equals("null")) {
+            //Removing nulls in metric names.
+            list.add(splits[i]);
+          }
+        }
+        //Joining metric name portions by "." delimiter.
+        metrics.put(StringUtils.join(list, "."), (Long)value);
+      }
+    }
+    if (!metrics.isEmpty()) {
+      if (!isInitialized) {
+        init();
+      }
+      if (isInitialized) {
+        LOG.debug("Publishing " + metrics.size() + " metrics to sink.");
+        metricsSource.publish(metrics);
+      }
+    }
+  }
+
+  /**
+   * Utlity method to get Profiling weight in Integer from String.
+   * @param value NONE/HEAVY/ALL/NORMAL
+   * @return SessionProfiler.HEAVY/NONE/ALL/NORMAL
+   */
+  private int getWeight(String value) {
+
+    if (StringUtils.isEmpty(value) || value.equals("NONE")) {
+      return SessionProfiler.NONE;
+    }
+
+    if (value.equals("ALL")) {
+      return SessionProfiler.ALL;
+    }
+
+    if (value.equals("NORMAL")) {
+      return SessionProfiler.NORMAL;
+    }
+
+    return SessionProfiler.HEAVY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
new file mode 100644
index 0000000..da5ad70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/DatabaseMetricsSource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metrics.system.impl;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @{link DatabaseMetricsSource} collects database metrics which is generated through Eclipselink PerformanceMonitor,
+ * and publishes to configured Metric Sink.
+ **/
+public class DatabaseMetricsSource extends AbstractMetricsSource {
+  private static Logger LOG = LoggerFactory.getLogger(DatabaseMetricsSource.class);
+  private static String dbMonitorPrefix = "monitor.";
+  private ExecutorService executor;
+  private MetricsConfiguration configuration;
+  private Set<String> includedMetricKeywords = new HashSet<>();
+  private Set<Pattern> acceptedEntityPatterns = new HashSet<>();
+  private Set<String> acceptedEntities = new HashSet<>();
+  private static String TIMER = "Timer.";
+  private static String COUNTER = "Counter.";
+
+  @Override
+  public void init(MetricsConfiguration metricsConfig, MetricsSink sink) {
+    super.init(metricsConfig, sink);
+    configuration = metricsConfig;
+    initializeFilterSets();
+    LOG.info("DatabaseMetricsSource initialized.");
+  }
+
+  /**
+   * Initialize filter sets (Entities and keywords) to know which metrics to track vs drop.
+   */
+  private void initializeFilterSets() {
+
+    String commaSeparatedValues = configuration.getProperty(dbMonitorPrefix + "query.keywords.include");
+    if (StringUtils.isNotEmpty((commaSeparatedValues))) {
+      includedMetricKeywords.addAll(Arrays.asList(commaSeparatedValues.split(",")));
+    }
+
+    commaSeparatedValues = configuration.getProperty(dbMonitorPrefix + "entities");
+    if (StringUtils.isNotEmpty((commaSeparatedValues))) {
+      String[] entityPatterns = (commaSeparatedValues.split(","));
+      for (String pattern : entityPatterns) {
+        acceptedEntityPatterns.add(Pattern.compile(pattern));
+      }
+    }
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Starting Database Metrics source...");
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+      .setNameFormat("DatabaseMetricsSource-%d")
+      .build();
+    executor = Executors.newSingleThreadExecutor(threadFactory);
+  }
+
+  /**
+   * Method to publish metrics to Sink asynchronously.
+   * @param metricsMap Map of metrics to be published to Sink
+   */
+  public void publish(final Map<String, Long> metricsMap) {
+    try {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          long currentTime = System.currentTimeMillis();
+
+          for (Iterator<Map.Entry<String, Long>> it = metricsMap.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<String, Long> metricEntry = it.next();
+            String metricName = metricEntry.getKey();
+            if (!acceptMetric(metricName)) {
+              it.remove();
+            }
+          }
+          final List<SingleMetric> metrics = new ArrayList<>();
+          for (String metricName : metricsMap.keySet()) {
+            double value = metricsMap.get(metricName).doubleValue();
+            metrics.add(new SingleMetric(metricName, value, currentTime));
+
+            /**
+             * Add computed (Timer/Counter) metric.
+             * Example
+             * Counter Metric : Counter.ReadAllQuery.HostRoleCommandEntity = 10000
+             * Timer Metric : Timer.ReadAllQuery.HostRoleCommandEntity = 50
+             * Computed Metric (Avg time for the operation) : ReadAllQuery.HostRoleCommandEntity = 200 (10000 div by 50)
+             */
+
+            if (metricName.startsWith(COUNTER)) {
+              String baseMetricName = metricName.substring(COUNTER.length());
+              if (metricsMap.containsKey(TIMER + baseMetricName)) {
+                double timerValue = metricsMap.get(TIMER + baseMetricName).doubleValue();
+                if (value != 0.0) {
+                  metrics.add(new SingleMetric(baseMetricName, timerValue / value , currentTime));
+                }
+              }
+            }
+          }
+          sink.publish(metrics);
+        }
+      });
+    } catch (Exception e) {
+      LOG.info("Exception when publishing Database metrics to sink", e);
+    }
+  }
+
+  /**
+   * Accept a metric to be passed to Sink or not.
+   * @param metricName
+   * @return true/false
+   */
+  public boolean acceptMetric(String metricName) {
+
+    boolean accept = false;
+
+    /*
+      Include entities to be tracked.
+      source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity
+     */
+    if (acceptedEntities.contains(metricName)) {
+      accept = true;
+    } else {
+      for (Pattern p : acceptedEntityPatterns) {
+        Matcher m = p.matcher(metricName);
+        if (m.find()) {
+          accept = true;
+        }
+      }
+    }
+
+    /*
+    Include some metrics which have the keyword even if they are not part of requested Entities.
+    source.database.monitor.query.keywords.include=CacheMisses
+     */
+
+    for (String keyword : includedMetricKeywords) {
+      if (metricName.contains(keyword)) {
+        accept = true;
+      }
+    }
+
+    String[] splits = metricName.split("\\.");
+    if (splits.length <= 2) {
+      accept = true; //Aggregate Counter metrics are always ok. They are not Entity specific
+    }
+
+    if (accept) {
+      acceptedEntities.add(metricName);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Method to get Configuration value given key. An extra prefix is added internally.
+   * @param key
+   * @param defaultValue
+   * @return Value corresponding to key = dbMonitorPrefix + key
+   */
+  public String getConfigurationValue(String key, String defaultValue) {
+    return this.configuration.getProperty(dbMonitorPrefix + key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
index cb9f275..dbb51cf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
@@ -21,6 +21,9 @@ import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.metrics.system.MetricsSink;
 import org.apache.ambari.server.metrics.system.SingleMetric;
@@ -38,24 +41,42 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 
+/**
+ * @{link JvmMetricsSource} collects JVM Metrics using codahale and publish to Metrics Sink.
+ */
 public class JvmMetricsSource extends AbstractMetricsSource {
   static final MetricRegistry registry = new MetricRegistry();
   private static Logger LOG = LoggerFactory.getLogger(JvmMetricsSource.class);
+  private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+  private static String JVM_PREFIX = "jvm";
+  private int interval = 10;
 
   @Override
   public void init(MetricsConfiguration configuration, MetricsSink sink) {
     super.init(configuration, sink);
-    registerAll("jvm.gc", new GarbageCollectorMetricSet(), registry);
-    registerAll("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
-    registerAll("jvm.memory", new MemoryUsageGaugeSet(), registry);
-    registerAll("jvm.threads", new ThreadStatesGaugeSet(), registry);
-    registry.register("jvm.file.open.descriptor.ratio", new FileDescriptorRatioGauge());
+    registerAll(JVM_PREFIX + ".gc", new GarbageCollectorMetricSet(), registry);
+    registerAll(JVM_PREFIX + ".buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
+    registerAll(JVM_PREFIX + ".memory", new MemoryUsageGaugeSet(), registry);
+    registerAll(JVM_PREFIX + ".threads", new ThreadStatesGaugeSet(), registry);
+    registry.register(JVM_PREFIX + ".file.open.descriptor.ratio", new FileDescriptorRatioGauge());
+    interval = Integer.parseInt(configuration.getProperty("interval", "10"));
+    LOG.info("JVM Metrics source initialized.");
   }
 
   @Override
-  public void run() {
-    sink.publish(getMetrics());
-    LOG.debug("********* Published system metrics to sink **********");
+  public void start() {
+    LOG.info("Starting JVM Metrics source...");
+    try {
+      executor.scheduleWithFixedDelay(new Runnable() {
+        @Override
+        public void run() {
+          sink.publish(getMetrics());
+          LOG.debug("********* Published JVM metrics to sink **********");
+        }
+      }, interval, interval, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.info("Throwing exception when starting metric source", e);
+    }
   }
 
   private void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
@@ -68,7 +89,6 @@ public class JvmMetricsSource extends AbstractMetricsSource {
     }
   }
 
-  @Override
   public List<SingleMetric> getMetrics() {
 
     List<SingleMetric> metrics = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
index ca83a53..540272d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
@@ -20,8 +20,10 @@ package org.apache.ambari.server.metrics.system.impl;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,4 +88,40 @@ public class MetricsConfiguration {
   public String getProperty(String key, String defaultValue) {
     return properties.getProperty(key, defaultValue);
   }
+
+  public Properties getProperties() {
+    return properties;
+  }
+
+  /**
+   * Get
+   *
+   * @param metricsConfiguration
+   * @param prefix
+   * @return subset configuration which contains the Key-Value pairs whose keys start with the passed in prefix.
+   */
+  public static MetricsConfiguration getSubsetConfiguration(MetricsConfiguration metricsConfiguration, String prefix) {
+
+    if (null == metricsConfiguration) {
+      return null;
+    }
+
+    Properties properties = metricsConfiguration.getProperties();
+    if (null == properties || StringUtils.isEmpty(prefix)) {
+      return new MetricsConfiguration(properties);
+    }
+
+    Properties subsetProperties = new Properties();
+
+    for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+      String key = entry.getKey().toString();
+      String val = entry.getValue().toString();
+      if (key.startsWith(prefix)) {
+        key = key.substring(prefix.length());
+        subsetProperties.put(key, val);
+      }
+    }
+
+    return new MetricsConfiguration(subsetProperties);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index d0d2e69..d7d905c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,16 +17,15 @@
  */
 package org.apache.ambari.server.metrics.system.impl;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.metrics.system.MetricsService;
 import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.MetricsSource;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +36,8 @@ import com.google.inject.Singleton;
 @Singleton
 public class MetricsServiceImpl implements MetricsService {
   private static Logger LOG = LoggerFactory.getLogger(MetricsServiceImpl.class);
-  private Map<String, AbstractMetricsSource> sources = new HashMap<>();
-  private MetricsSink sink = null;
+  private static Map<String, MetricsSource> sources = new HashMap<>();
+  private static MetricsSink sink = null;
   private MetricsConfiguration configuration = null;
 
   @Inject
@@ -57,7 +56,8 @@ public class MetricsServiceImpl implements MetricsService {
       initializeMetricSources();
 
       if (!sink.isInitialized()) {
-        //If Sink is not initialized, Service will check for every 5 mins.
+        // If Sink is not initialized (say, cluster has not yet been deployed or AMS had not been installed)
+        // Service will check for every 5 mins.
         Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new Runnable() {
           @Override
           public void run() {
@@ -86,41 +86,42 @@ public class MetricsServiceImpl implements MetricsService {
       String commaSeparatedSources = configuration.getProperty("metric.sources");
 
       if (StringUtils.isEmpty(commaSeparatedSources)) {
-        LOG.info("No sources configured.");
+        LOG.info("No metric sources configured.");
         return;
       }
 
       String[] sourceNames = commaSeparatedSources.split(",");
-      for (String sourceName: sourceNames) {
+      for (String sourceName : sourceNames) {
+
+        if (StringUtils.isEmpty(sourceName)) {
+          continue;
+        }
+        sourceName = sourceName.trim();
+
         String className = configuration.getProperty("source." + sourceName + ".class");
-        Class t = Class.forName(className);
-        AbstractMetricsSource src = (AbstractMetricsSource)t.newInstance();
-        src.init(configuration, sink);
+        Class sourceClass;
+        try {
+          sourceClass = Class.forName(className);
+        } catch (ClassNotFoundException ex) {
+          LOG.info("Source class not found for source name :" + sourceName);
+          continue;
+        }
+        AbstractMetricsSource src = (AbstractMetricsSource) sourceClass.newInstance();
+        src.init(MetricsConfiguration.getSubsetConfiguration(configuration, "source." + sourceName + "."), sink);
         sources.put(sourceName, src);
+        src.start();
       }
 
-      final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-      for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
-        startSource(executor, entry);
-      }
     } catch (Exception e) {
       LOG.error("Error when configuring metric sink and source", e);
     }
   }
 
-  private void startSource(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
-    String className = entry.getKey();
-    AbstractMetricsSource source = entry.getValue();
-    String interval = "source." + className + ".interval";
-    int duration = Integer.parseInt(configuration.getProperty(interval, "10")); // default value 10 seconds
-    try {
-      executor.scheduleWithFixedDelay(source, 0, duration, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.info("Throwing exception when starting metric source", e);
-    }
+  public static MetricsSource getSource(String type) {
+    return sources.get(type);
   }
 
-  public Collection<AbstractMetricsSource> getSources() {
-    return sources.values();
+  public static MetricsSink getSink() {
+    return sink;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
deleted file mode 100644
index 84b2df4..0000000
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.metric.system.impl;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.ambari.server.metrics.system.MetricsSink;
-import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
-import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.junit.Assume;
-import org.junit.Test;
-
-public class JvmMetricsSourceTest {
-
-  @Test
-  public void testJvmSourceInit_PreJVM1_8() {
-    Assume.assumeThat(System.getProperty("java.version"), new LessThanVersionMatcher("1.8"));
-    testJvmSourceInit(39);
-  }
-
-  @Test
-  public void testJvmSourceInit_JVM1_8() {
-    Assume.assumeThat(System.getProperty("java.version"), new VersionMatcher("1.8"));
-    testJvmSourceInit(40);
-  }
-
-  private void testJvmSourceInit(int metricsSize) {
-    JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
-    MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
-    MetricsSink sink = new TestAmbariMetricsSinkImpl();
-    jvmMetricsSource.init(configuration, sink);
-    org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), metricsSize);
-  }
-
-  /* ****************************************************************
-   * Matcher classes used in Assume checks
-   * **************************************************************** */
-  private class VersionMatcher extends BaseMatcher<String> {
-    private final float version;
-
-    VersionMatcher(String version) {
-      this.version = Float.parseFloat(version);
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      return parseVersion((String) o) == this.version;
-    }
-
-    float parseVersion(String versionString) {
-      Pattern p = Pattern.compile("(\\d+(?:\\.\\d+)).*");
-      Matcher matcher = p.matcher(versionString);
-      if (matcher.matches()) {
-        return Float.parseFloat(matcher.group(1));
-      } else {
-        return 0f;
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-
-    }
-
-    public float getVersion() {
-      return version;
-    }
-  }
-
-  private class LessThanVersionMatcher extends VersionMatcher {
-
-    LessThanVersionMatcher(String version) {
-      super(version);
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      return parseVersion((String) o) < getVersion();
-    }
-
-    @Override
-    public void describeTo(Description description) {
-
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
index 4029f25..7344f55 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.ambari.server.metric.system.impl;
 
-import org.apache.ambari.server.metrics.system.MetricsService;
-import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+import org.apache.ambari.server.metrics.system.MetricsSource;
 import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
 import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl;
 import org.junit.Test;
@@ -30,11 +29,16 @@ public class MetricsServiceTest {
 
   @Test
   public void testMetricsServiceStart() {
-    MetricsService metricsService = new MetricsServiceImpl();
+    MetricsServiceImpl metricsService = new MetricsServiceImpl();
     metricsService.start();
-    Assert.assertTrue(metricsService.getSources().size() == 2);
-    for (AbstractMetricsSource source : metricsService.getSources()) {
-      Assert.assertTrue ( source instanceof JvmMetricsSource || source instanceof TestMetricsSource);
-    }
+
+    MetricsSource source = MetricsServiceImpl.getSource("jvm");
+    Assert.assertNotNull(source);
+    Assert.assertTrue(source instanceof JvmMetricsSource);
+
+    source = MetricsServiceImpl.getSource("testsource");
+    Assert.assertNotNull(source);
+    Assert.assertTrue(source instanceof TestMetricsSource);
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
new file mode 100644
index 0000000..067d4fb
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsSourceTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.AmbariMetricSinkImpl;
+import org.apache.ambari.server.metrics.system.impl.DatabaseMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import junit.framework.Assert;
+
+@RunWith(EasyMockRunner.class)
+public class MetricsSourceTest {
+
+  @Test
+  public void testJvmSourceInit_PreJVM1_8() {
+    Assume.assumeThat(System.getProperty("java.version"), new LessThanVersionMatcher("1.8"));
+    testJvmSourceInit(39);
+  }
+
+  @Test
+  public void testJvmSourceInit_JVM1_8() {
+    Assume.assumeThat(System.getProperty("java.version"), new VersionMatcher("1.8"));
+    testJvmSourceInit(40);
+  }
+
+  private void testJvmSourceInit(int metricsSize) {
+    JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
+    MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
+    MetricsSink sink = new TestAmbariMetricsSinkImpl();
+    jvmMetricsSource.init(configuration, sink);
+    org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), metricsSize);
+  }
+
+  /* ****************************************************************
+   * Matcher classes used in Assume checks
+   * **************************************************************** */
+  private class VersionMatcher extends BaseMatcher<String> {
+    private final float version;
+
+    VersionMatcher(String version) {
+      this.version = Float.parseFloat(version);
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      return parseVersion((String) o) == this.version;
+    }
+
+    float parseVersion(String versionString) {
+      Pattern p = Pattern.compile("(\\d+(?:\\.\\d+)).*");
+      Matcher matcher = p.matcher(versionString);
+      if (matcher.matches()) {
+        return Float.parseFloat(matcher.group(1));
+      } else {
+        return 0f;
+      }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+
+    }
+
+    public float getVersion() {
+      return version;
+    }
+  }
+
+  private class LessThanVersionMatcher extends VersionMatcher {
+
+    LessThanVersionMatcher(String version) {
+      super(version);
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      return parseVersion((String) o) < getVersion();
+    }
+
+    @Override
+    public void describeTo(Description description) {
+
+    }
+  }
+
+  @Test(timeout=20000)
+  public void testDatabaseMetricSourcePublish() throws InterruptedException {
+    Map<String, Long> metricsMap = new HashMap<>();
+
+    metricsMap.put("Timer.UpdateObjectQuery.HostRoleCommandEntity", 10000l); // Should be accepted.
+    metricsMap.put("Timer.UpdateObjectQuery.HostRoleCommandEntity.SqlPrepare", 5000l); // Should be accepted.
+    metricsMap.put("Timer.DirectReadQuery", 6000l); // Should be accepted.
+    metricsMap.put("Timer.ReadAllQuery.StackEntity.StackEntity.findByNameAndVersion.SqlPrepare", 15000l); //Should be discarded
+
+    metricsMap.put("Counter.UpdateObjectQuery.HostRoleCommandEntity", 10l); // Should be accepted & should add a computed metric.
+    metricsMap.put("Counter.ReadObjectQuery.RequestEntity.request", 4330l); //Should be discarded
+    metricsMap.put("Counter.ReadObjectQuery.MetainfoEntity.readMetainfoEntity.CacheMisses", 15l); // Should be accepted.
+
+    DatabaseMetricsSource source = new DatabaseMetricsSource();
+
+    MetricsConfiguration configuration = MetricsConfiguration.getSubsetConfiguration(
+      MetricsConfiguration.getMetricsConfiguration(), "source.database.");
+
+    MetricsSink sink = EasyMock.createMock(AmbariMetricSinkImpl.class);
+    Capture<List<SingleMetric>> metricsCapture = EasyMock.newCapture();
+    sink.publish(capture(metricsCapture));
+    expectLastCall().once();
+
+    replay(sink);
+    source.init(configuration, sink);
+    source.start();
+    source.publish(metricsMap);
+    Thread.sleep(5000l);
+    verify(sink);
+
+    Assert.assertTrue(metricsCapture.getValue().size() == 6);
+  }
+
+  @Test
+  public void testDatabaseMetricsSourceAcceptMetric() {
+
+    DatabaseMetricsSource source = new DatabaseMetricsSource();
+    MetricsConfiguration configuration = MetricsConfiguration.getSubsetConfiguration(
+      MetricsConfiguration.getMetricsConfiguration(), "source.database.");
+    MetricsSink sink = new TestAmbariMetricsSinkImpl();
+    source.init(configuration, sink);
+
+    Assert.assertTrue(source.acceptMetric("Timer.UpdateObjectQuery.HostRoleCommandEntity.SqlPrepare"));
+    Assert.assertFalse(source.acceptMetric("Counter.ReadObjectQuery.RequestEntity.request"));
+    Assert.assertTrue(source.acceptMetric("Counter.ReadObjectQuery.MetainfoEntity.readMetainfoEntity.CacheMisses"));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
index 3565504..36d8fee 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -30,6 +30,7 @@ public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink imple
 
   @Override
   public void publish(List<SingleMetric> metrics) {
+    LOG.info("Published " + metrics.size() + " metrics.");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
index acf1586..5f43b07 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
@@ -18,20 +18,11 @@
 
 package org.apache.ambari.server.metric.system.impl;
 
-import java.util.List;
-
-import org.apache.ambari.server.metrics.system.SingleMetric;
 import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
 
 public class TestMetricsSource extends AbstractMetricsSource {
 
   @Override
-  public List<SingleMetric> getMetrics() {
-    return null;
+  public void start() {
   }
-
-  @Override
-  public void run() {
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ec8809ce/ambari-server/src/test/resources/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/metrics.properties b/ambari-server/src/test/resources/metrics.properties
index 5eee064..d0ea1f3 100644
--- a/ambari-server/src/test/resources/metrics.properties
+++ b/ambari-server/src/test/resources/metrics.properties
@@ -21,9 +21,8 @@
 # Source interval determines how often the metric is sent to sink. Its unit is in seconds
 metric.sources=jvm,testsource
 
-source.jvm.interval=10
 source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
 source.testsource.class=org.apache.ambari.server.metric.system.impl.TestMetricsSource
 
-#### Sink Configs #####
-# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
\ No newline at end of file
+source.database.monitor.entities=Cluster(.*)Entity,Host(.*)Entity,ExecutionCommandEntity
+source.database.monitor.query.keywords.include=CacheMisses
\ No newline at end of file