You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/12/22 20:09:49 UTC

ambari git commit: AMBARI-19273 : Refine AmbariServer Metrics service and enable JVM metrics source by default.

Repository: ambari
Updated Branches:
  refs/heads/trunk c695e50a7 -> 6d196db0d


AMBARI-19273 : Refine AmbariServer Metrics service and enable JVM metrics source by default.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6d196db0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6d196db0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6d196db0

Branch: refs/heads/trunk
Commit: 6d196db0d80a145bc323a03aff74533938836392
Parents: c695e50
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Dec 22 12:09:27 2016 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Dec 22 12:09:27 2016 -0800

----------------------------------------------------------------------
 ambari-server/conf/unix/metrics.properties      |  15 +-
 ambari-server/conf/windows/metrics.properties   |  28 +++
 .../src/main/assemblies/server-windows.xml      |   4 +
 ambari-server/src/main/assemblies/server.xml    |   4 +
 .../server/configuration/Configuration.java     |   8 +
 .../ambari/server/controller/AmbariServer.java  |  19 +-
 .../server/metrics/system/AmbariMetricSink.java |  34 ---
 .../server/metrics/system/MetricsService.java   |  16 +-
 .../server/metrics/system/MetricsSink.java      |  43 ++++
 .../server/metrics/system/MetricsSource.java    |   5 +-
 .../server/metrics/system/SingleMetric.java     |  44 ++++
 .../system/impl/AbstractMetricsSource.java      |  15 +-
 .../system/impl/AmbariMetricSinkImpl.java       | 239 ++++++++++++++-----
 .../metrics/system/impl/Configuration.java      |  83 -------
 .../metrics/system/impl/JvmMetricsSource.java   |  54 +++--
 .../system/impl/MetricsConfiguration.java       |  89 +++++++
 .../metrics/system/impl/MetricsServiceImpl.java | 154 +++++-------
 .../system/impl/JvmMetricsSourceTest.java       |  36 +++
 .../metric/system/impl/MetricsServiceTest.java  |  40 ++++
 .../system/impl/TestAmbariMetricsSinkImpl.java  |  79 ++++++
 .../metric/system/impl/TestMetricsSource.java   |  37 +++
 .../src/test/resources/metrics.properties       |  29 +++
 22 files changed, 749 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/conf/unix/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties
index 5f01e39..3ee22d6 100644
--- a/ambari-server/conf/unix/metrics.properties
+++ b/ambari-server/conf/unix/metrics.properties
@@ -17,15 +17,12 @@
 # limitations under the License.
 
 
-# Metrics sources info
-metrics.sources=jvm
-
+#### Source Configs #####
 # Source interval determines how often the metric is sent to sink. Its unit is in seconds
-source.jvm.interval=5
-source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+metric.sources=jvm
 
-#source.database.interval=10
-#source.database.class=org.apache.ambari.server.metrics.system.impl.DbMetricSource
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
 
-# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
-sink.frequency=10
\ No newline at end of file
+#Override Ambari Server hostname for metrics
+#ambariserver.hostname.override=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/conf/windows/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/metrics.properties b/ambari-server/conf/windows/metrics.properties
new file mode 100644
index 0000000..3ee22d6
--- /dev/null
+++ b/ambari-server/conf/windows/metrics.properties
@@ -0,0 +1,28 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#### Source Configs #####
+# Source interval determines how often the metric is sent to sink. Its unit is in seconds
+metric.sources=jvm
+
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+
+#Override Ambari Server hostname for metrics
+#ambariserver.hostname.override=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/assemblies/server-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/assemblies/server-windows.xml b/ambari-server/src/main/assemblies/server-windows.xml
index 191fcfb..ce1e270 100644
--- a/ambari-server/src/main/assemblies/server-windows.xml
+++ b/ambari-server/src/main/assemblies/server-windows.xml
@@ -45,6 +45,10 @@
       <outputDirectory>/ambari-server-${project.version}/conf</outputDirectory>
     </file>
     <file>
+      <source>${basedir}/conf/windows/metrics.properties</source>
+      <outputDirectory>/ambari-server-${project.version}/conf</outputDirectory>
+    </file>
+    <file>
       <source>${basedir}/conf/windows/ca.config</source>
       <outputDirectory>/ambari-server-${project.version}/keystore</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/assemblies/server.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/assemblies/server.xml b/ambari-server/src/main/assemblies/server.xml
index 3a61477..5055d46 100644
--- a/ambari-server/src/main/assemblies/server.xml
+++ b/ambari-server/src/main/assemblies/server.xml
@@ -219,6 +219,10 @@
       <outputDirectory>/etc/ambari-server/conf</outputDirectory>
     </file>
     <file>
+      <source>conf/unix/metrics.properties</source>
+      <outputDirectory>/etc/ambari-server/conf</outputDirectory>
+    </file>
+    <file>
       <source>conf/unix/krb5JAASLogin.conf</source>
       <outputDirectory>/etc/ambari-server/conf</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 03af391..d2f7934 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -2586,6 +2586,14 @@ public class Configuration {
   public static final ConfigurationProperty<Integer> LOGSEARCH_PORTAL_READ_TIMEOUT = new ConfigurationProperty<>(
     "logsearch.portal.read.timeout", 5000);
 
+
+  /**
+   * Global disable flag for AmbariServer Metrics.
+   */
+  @Markdown(description = "Global disable flag for AmbariServer Metrics.")
+  public static final ConfigurationProperty<Boolean> AMBARISERVER_METRICS_DISABLE = new ConfigurationProperty<>(
+    "ambariserver.metrics.disable", false);
+
   private static final Logger LOG = LoggerFactory.getLogger(
     Configuration.class);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 65a4b1c..1704546 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -28,8 +28,6 @@ import java.net.URL;
 import java.util.EnumSet;
 import java.util.Enumeration;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.logging.LogManager;
 
 import javax.crypto.BadPaddingException;
@@ -542,6 +540,9 @@ public class AmbariServer {
       ExecutionScheduleManager executionScheduleManager = injector
           .getInstance(ExecutionScheduleManager.class);
 
+      MetricsService metricsService = injector.getInstance(
+        MetricsService.class);
+
       clusterController = controller;
 
       StateRecoveryManager recoveryManager = injector.getInstance(
@@ -554,14 +555,6 @@ public class AmbariServer {
        */
       server.start();
 
-      // TODO, start every other tread.
-      final ExecutorService executor = Executors.newSingleThreadExecutor();
-      MetricsService metricsService = injector.getInstance(
-              MetricsService.class);
-      metricsService.init();
-      executor.submit(metricsService);
-      LOG.info("********* Started Ambari Metrics **********");
-
       serverForAgent.start();
       LOG.info("********* Started Server **********");
 
@@ -574,6 +567,12 @@ public class AmbariServer {
       serviceManager.startAsync();
       LOG.info("********* Started Services **********");
 
+      if (!Configuration.AMBARISERVER_METRICS_DISABLE.equals(true)) {
+        metricsService.start();
+      } else {
+        LOG.info("AmbariServer Metrics disabled.");
+      }
+
       server.join();
       LOG.info("Joined the Server");
     } catch (BadPaddingException bpe) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
deleted file mode 100644
index 809176be..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package org.apache.hadoop.metrics2.sink.ambari;
-
-package org.apache.ambari.server.metrics.system;
-
-import java.util.Map;
-
-public interface AmbariMetricSink {
-  /**
-   * initialize Collector URI and sink frequency to publish the metrics to AMS
-   **/
-  void init(String protocol, String collectorUri, int frequency);
-
-  /**
-  *  Publish metrics to Collector
-  **/
-  void publish(Map<String, Number> metricsMap);
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
index 23845c9..29db6a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
@@ -15,12 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ambari.server.metrics.system;
 
-public interface MetricsService extends Runnable {
+import java.util.Collection;
+
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+
+
+public interface MetricsService{
   /**
    * Set up configuration
    **/
-  void init();
+  void start();
+
+  /**
+   * Get Configured sources
+   * @return
+   */
+  Collection<AbstractMetricsSource> getSources();
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
new file mode 100644
index 0000000..803e6e0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system;
+
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+
+
+public interface MetricsSink {
+
+  /**
+   * initialize Sink passing in configuration
+   **/
+  void init(MetricsConfiguration configuration);
+
+  /**
+  *  Publish metrics to Collector
+  **/
+  void publish(List<SingleMetric> metrics);
+
+
+  /**
+   * Returns if the sink is initialized.
+   */
+  boolean isInitialized();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
index cf10408..b0ccb3c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
@@ -15,11 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ambari.server.metrics.system;
 
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+
 public interface MetricsSource extends Runnable{
   /**
    * initialize sink
    **/
-  void init(AmbariMetricSink sink);
+  void init(MetricsConfiguration configuration, MetricsSink sink);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
new file mode 100644
index 0000000..6626c8ab
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system;
+
+public class SingleMetric {
+
+  String metricName;
+  double value;
+  long timestamp;
+
+  public SingleMetric (String metricName, double value, long timestamp) {
+    this.metricName = metricName;
+    this.value = value;
+    this.timestamp = timestamp;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public double getValue() {
+    return value;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
index cb90a13..34ac872 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
@@ -17,25 +17,26 @@
  */
 package org.apache.ambari.server.metrics.system.impl;
 
-import java.util.Map;
+import java.util.List;
 
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
+import org.apache.ambari.server.metrics.system.MetricsSink;
 import org.apache.ambari.server.metrics.system.MetricsSource;
+import org.apache.ambari.server.metrics.system.SingleMetric;
 
 public abstract class AbstractMetricsSource implements MetricsSource {
-  protected AmbariMetricSink sink;
+  protected MetricsSink sink;
 
   /**
    *  Pass metrics sink to metrics source
    **/
   @Override
-  public void init(AmbariMetricSink sink) {
-      this.sink = sink;
+  public void init(MetricsConfiguration configuration, MetricsSink sink) {
+    this.sink = sink;
   }
 
   /**
    *  Get metrics at the instance
    *  @return a map for metrics that maps metrics name to metrics value
    **/
-  abstract public Map<String, Number> getMetrics();
-}
+  abstract public List<SingleMetric> getMetrics();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index 391fe11..ba5f983 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,73 +17,177 @@
  */
 package org.apache.ambari.server.metrics.system.impl;
 
-import java.io.IOException;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
-import org.apache.commons.lang.ClassUtils;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
-import jline.internal.Log;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
+  private static final String AMBARI_SERVER_APP_ID = "ambari_server";
+  private Collection<String> collectorHosts;
 
-public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements AmbariMetricSink {
-  private static final String APP_ID = "ambari_server";
-  private int timeoutSeconds = 10;
-  private String collectorProtocol;
   private String collectorUri;
+  private String port;
+  private String protocol;
   private String hostName;
-  private int counter = 0;
-  private int frequency;
-  private List<TimelineMetric> buffer = new ArrayList<>();
+  private AmbariManagementController ambariManagementController;
+  private TimelineMetricsCache timelineMetricsCache;
+  private boolean isInitialized = false;
+
+  public AmbariMetricSinkImpl(AmbariManagementController amc) {
+    this.ambariManagementController = amc;
+  }
+
   @Override
-  public void init(String protocol, String collectorUri, int frequency) {
+  public void init(MetricsConfiguration configuration) {
+
+    if (ambariManagementController == null) {
+      return;
+    }
+
+    InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin");
+    authenticationToken.setAuthenticated(true);
+    SecurityContextHolder.getContext().setAuthentication(authenticationToken);
+    Clusters clusters = ambariManagementController.getClusters();
+    String ambariMetricsServiceName = "AMBARI_METRICS";
+    collectorHosts = new HashSet<>();
+
+    for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) {
+      String clusterName = kv.getKey();
+      Cluster c = kv.getValue();
+      Resource.Type type = Resource.Type.ServiceConfigVersion;
+
+      Set<String> propertyIds = new HashSet<String>();
+      propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
+
+      Predicate predicate = new PredicateBuilder().property(
+        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property(
+        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals(ambariMetricsServiceName).and().property(
+        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate();
+
+      Request request = PropertyHelper.getReadRequest(propertyIds);
+
+      ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+        type,
+        PropertyHelper.getPropertyIds(type),
+        PropertyHelper.getKeyPropertyIds(type),
+        ambariManagementController);
+
+      try {
+        //get collector host(s)
+        Service service = c.getService(ambariMetricsServiceName);
+        if (service != null) {
+          for (String component : service.getServiceComponents().keySet()) {
+            ServiceComponent sc = service.getServiceComponents().get(component);
+            for (ServiceComponentHost serviceComponentHost : sc.getServiceComponentHosts().values()) {
+              if (serviceComponentHost.getServiceComponentName().equals("METRICS_COLLECTOR")) {
+                collectorHosts.add(serviceComponentHost.getHostName());
+              }
+            }
+          }
+        }
+
+        // get collector port and protocol
+        Set<Resource> resources = provider.getResources(request, predicate);
+
+        for (Resource resource : resources) {
+          if (resource != null) {
+            ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>)
+              resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
+            for (LinkedHashMap<Object, Object> config : configs) {
+              if (config != null && config.get("type").equals("ams-site")) {
+                TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties");
+                String timelineWebappAddress = (String) properties.get("timeline.metrics.service.webapp.address");
+                if (StringUtils.isNotEmpty(timelineWebappAddress) && timelineWebappAddress.contains(":")) {
+                  port = timelineWebappAddress.split(":")[1];
+                }
+                String httpPolicy = (String) properties.get("timeline.metrics.service.http.policy");
+                protocol = httpPolicy.equals("HTTP_ONLY") ? "http" : "https";
+                break;
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception caught when retrieving Collector URI", e);
+      }
+    }
+
+    collectorUri = getCollectorUri(findPreferredCollectHost());
+    hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName());
+
+    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+      String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+      String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
 
-    /**
-     * Protocol is either HTTP or HTTPS, and the collectorURI is the domain name of the collector
-     * An example of the complete collector URI might be: http://c6403.ambari.org/ws/v1/timeline/metrics
-     */
-    this.frequency = frequency;
-    this.collectorProtocol = protocol;
-    this.collectorUri = getCollectorUri(collectorUri);
+    timelineMetricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
 
+    if (CollectionUtils.isNotEmpty(collectorHosts)) {
+      isInitialized = true;
+    }
+  }
+
+  private String getDefaultLocalHostName() {
     try {
-      hostName = InetAddress.getLocalHost().getHostName();
+      return InetAddress.getLocalHost().getCanonicalHostName();
     } catch (UnknownHostException e) {
-      Log.info("Error getting host address");
+      LOG.info("Error getting host address");
     }
+    return null;
   }
 
   @Override
-  public void publish(Map<String, Number> metricsMap) {
-    List<TimelineMetric> metricsList =  createMetricsList(metricsMap);
+  public void publish(List<SingleMetric> metrics) {
 
-    if(counter > frequency) {
-      TimelineMetrics timelineMetrics = new TimelineMetrics();
-      timelineMetrics.setMetrics(buffer);
 
-      String connectUrl = collectorUri;
-      String jsonData = null;
-      try {
-        jsonData = mapper.writeValueAsString(timelineMetrics);
-      } catch (IOException e) {
-        LOG.error("Unable to parse metrics", e);
-      }
-      if (jsonData != null) {
-        emitMetricsJson(connectUrl, jsonData);
+    //If Sink not yet initialized, drop the metrics on the floor.
+    if (isInitialized) {
+      List<TimelineMetric> metricList = getFilteredMetricList(metrics);
+      if (!metricList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricList);
+        emitMetrics(timelineMetrics);
       }
-      counter = 0;
-    } else {
-      buffer.addAll(metricsList);
-      counter++;
     }
+  }
 
+  @Override
+  public boolean isInitialized() {
+    return isInitialized;
   }
 
 
@@ -94,22 +198,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
    */
   @Override
   protected String getCollectorUri(String host) {
-    return getCollectorProtocol() + "://" + host + WS_V1_TIMELINE_METRICS;
+    return constructContainerMetricUri(protocol, host, port);
   }
 
   @Override
   protected String getCollectorProtocol() {
-    return collectorProtocol;
+    return protocol;
   }
 
   @Override
   protected String getCollectorPort() {
-    return null;
+    return port;
   }
 
   @Override
   protected int getTimeoutSeconds() {
-      return timeoutSeconds;
+    return 10;
   }
 
   /**
@@ -119,6 +223,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
    */
   @Override
   protected String getZookeeperQuorum() {
+    //Ignoring Zk Fallback.
     return null;
   }
 
@@ -129,7 +234,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
    */
   @Override
   protected Collection<String> getConfiguredCollectorHosts() {
-    return null;
+    return collectorHosts;
   }
 
   /**
@@ -142,27 +247,33 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
     return hostName;
   }
 
-  private List<TimelineMetric> createMetricsList(Map<String, Number> metricsMap) {
-    final List<TimelineMetric> metricsList = new ArrayList<>();
-    for (Map.Entry<String, Number> entry : metricsMap.entrySet()) {
-      final long currentTimeMillis = System.currentTimeMillis();
-      String metricsName = entry.getKey();
-      Number value = entry.getValue();
-      TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricsName, value);
-      metricsList.add(metric);
+  private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
+    final List<TimelineMetric> metricList = new ArrayList<>();
+    for (SingleMetric metric : metrics) {
+
+      String metricName = metric.getMetricName();
+      Double value = metric.getValue();
+
+      TimelineMetric timelineMetric = createTimelineMetric(metric.getTimestamp(), AMBARI_SERVER_APP_ID, metricName, value);
+      timelineMetricsCache.putTimelineMetric(timelineMetric, false);
+      TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(metricName);
+
+      if (cachedMetric != null) {
+        metricList.add(cachedMetric);
+      }
     }
-    return metricsList;
+    return metricList;
   }
 
   private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
                                               Number attributeValue) {
-      TimelineMetric timelineMetric = new TimelineMetric();
-      timelineMetric.setMetricName(attributeName);
-      timelineMetric.setHostName(hostName);
-      timelineMetric.setAppId(component);
-      timelineMetric.setStartTime(currentTimeMillis);
-      timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
-      timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
-      return timelineMetric;
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostName);
+    timelineMetric.setAppId(component);
+    timelineMetric.setStartTime(currentTimeMillis);
+
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
+    return timelineMetric;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
deleted file mode 100644
index 44a3de0..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.metrics.system.impl;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Configuration {
-  public static final String CONFIG_FILE = "metrics.properties";
-
-  private static Logger LOG = LoggerFactory.getLogger(Configuration.class);
-  private Properties properties;
-
-  public Configuration() {
-      this(readConfigFile());
-  }
-
-  public Configuration(Properties properties) {
-      this.properties = properties;
-  }
-
-  private static Properties readConfigFile() {
-      Properties properties = new Properties();
-
-      //Get property file stream from classpath
-      InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(CONFIG_FILE);
-
-      if (inputStream == null) {
-          throw new RuntimeException(CONFIG_FILE + " not found in classpath");
-      }
-
-      // load the properties
-      try {
-          properties.load(inputStream);
-          inputStream.close();
-      } catch (FileNotFoundException fnf) {
-          LOG.info("No configuration file " + CONFIG_FILE + " found in classpath.", fnf);
-      } catch (IOException ie) {
-          throw new IllegalArgumentException("Can't read configuration file " +
-                  CONFIG_FILE, ie);
-      }
-
-      return properties;
-  }
-
-  /**
-   * Get the property value for the given key.
-   *
-   * @return the property value
-   */
-  public String getProperty(String key) {
-    return properties.getProperty(key);
-  }
-
-  /**
-   * Get the property value for the given key.
-   *
-   * @return the property value
-   */
-  public String getProperty(String key, String defaultValue) {
-    return properties.getProperty(key, defaultValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
index ef07deb..2e4747b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
@@ -18,17 +18,22 @@
 package org.apache.ambari.server.metrics.system.impl;
 
 import java.lang.management.ManagementFactory;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
@@ -38,21 +43,21 @@ public class JvmMetricsSource extends AbstractMetricsSource {
   private static Logger LOG = LoggerFactory.getLogger(JvmMetricsSource.class);
 
   @Override
-  public void init(AmbariMetricSink sink) {
-    super.init(sink);
-    registerAll("gc", new GarbageCollectorMetricSet(), registry);
-    registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
-    registerAll("memory", new MemoryUsageGaugeSet(), registry);
-    registerAll("threads", new ThreadStatesGaugeSet(), registry);
+  public void init(MetricsConfiguration configuration, MetricsSink sink) {
+    super.init(configuration, sink);
+    registerAll("jvm.gc", new GarbageCollectorMetricSet(), registry);
+    registerAll("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
+    registerAll("jvm.memory", new MemoryUsageGaugeSet(), registry);
+    registerAll("jvm.threads", new ThreadStatesGaugeSet(), registry);
+    registry.register("jvm.file.open.descriptor.ratio", new FileDescriptorRatioGauge());
   }
 
   @Override
   public void run() {
-    this.sink.publish(getMetrics());
-    LOG.info("********* Published system metrics to sink **********");
+    sink.publish(getMetrics());
+    LOG.debug("********* Published system metrics to sink **********");
   }
 
-
   private void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
     for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
       if (entry.getValue() instanceof MetricSet) {
@@ -64,13 +69,26 @@ public class JvmMetricsSource extends AbstractMetricsSource {
   }
 
   @Override
-  public Map<String, Number> getMetrics() {
-    Map<String, Number> map = new HashMap<>();
-    for (String metricName : registry.getGauges().keySet()) {
-      if (metricName.equals("threads.deadlocks") ) continue;
-      Number value = (Number)registry.getGauges().get(metricName).getValue();
-      map.put(metricName, value);
+  public List<SingleMetric> getMetrics() {
+
+    List<SingleMetric> metrics = new ArrayList<>();
+    Map<String, Gauge> gaugeSet = registry.getGauges(new NonNumericMetricFilter());
+    for (String metricName : gaugeSet.keySet()) {
+      Number value = (Number) gaugeSet.get(metricName).getValue();
+      metrics.add(new SingleMetric(metricName, value.doubleValue(), System.currentTimeMillis()));
+    }
+
+    return metrics;
+  }
+
+  public class NonNumericMetricFilter implements MetricFilter {
+
+    @Override
+    public boolean matches(String name, Metric metric) {
+      if (name.equalsIgnoreCase("jvm.threads.deadlocks")) {
+        return false;
+      }
+      return true;
     }
-    return map;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
new file mode 100644
index 0000000..ca83a53
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metrics.system.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsConfiguration {
+  public static final String CONFIG_FILE = "metrics.properties";
+
+  private static Logger LOG = LoggerFactory.getLogger(MetricsConfiguration.class);
+  private Properties properties;
+
+  public static MetricsConfiguration getMetricsConfiguration() {
+    Properties properties = readConfigFile();
+    if (properties == null || properties.isEmpty()) {
+      return null;
+    }
+    return new MetricsConfiguration(properties);
+  }
+
+  public MetricsConfiguration(Properties properties) {
+    this.properties = properties;
+  }
+
+  private static Properties readConfigFile() {
+    Properties properties = new Properties();
+
+    //Get property file stream from classpath
+    InputStream inputStream = MetricsConfiguration.class.getClassLoader().getResourceAsStream(CONFIG_FILE);
+
+    if (inputStream == null) {
+      LOG.info(CONFIG_FILE + " not found in classpath");
+      return null;
+    }
+
+    // load the properties
+    try {
+      properties.load(inputStream);
+      inputStream.close();
+    } catch (FileNotFoundException fnf) {
+      LOG.info("No configuration file " + CONFIG_FILE + " found in classpath.");
+      return null;
+    } catch (IOException ie) {
+      LOG.error("Can't read configuration file " + CONFIG_FILE, ie);
+      return null;
+    }
+
+    return properties;
+  }
+
+  /**
+   * Get the property value for the given key.
+   *
+   * @return the property value
+   */
+  public String getProperty(String key) {
+    return properties.getProperty(key);
+  }
+
+  /**
+   * Get the property value for the given key.
+   *
+   * @return the property value
+   */
+  public String getProperty(String key, String defaultValue) {
+    return properties.getProperty(key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index 44b6204..96a51f3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -17,35 +17,19 @@
  */
 package org.apache.ambari.server.metrics.system.impl;
 
-
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.controller.AmbariManagementController;
-import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
-import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider;
-import org.apache.ambari.server.controller.spi.Predicate;
-import org.apache.ambari.server.controller.spi.Request;
-import org.apache.ambari.server.controller.spi.Resource;
-import org.apache.ambari.server.controller.spi.ResourceProvider;
-import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.controller.utilities.PropertyHelper;
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
 import org.apache.ambari.server.metrics.system.MetricsService;
-import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.security.core.context.SecurityContextHolder;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -54,115 +38,89 @@ import com.google.inject.Singleton;
 public class MetricsServiceImpl implements MetricsService {
   private static Logger LOG = LoggerFactory.getLogger(MetricsServiceImpl.class);
   private Map<String, AbstractMetricsSource> sources = new HashMap<>();
-  private AmbariMetricSink sink = new AmbariMetricSinkImpl();
-  private String collectorUri = "";
-  private String collectorProtocol = "";
-  private Configuration configuration;
+  private MetricsSink sink = null;
+  private MetricsConfiguration configuration = null;
 
   @Inject
   AmbariManagementController amc;
 
   @Override
-  public void init() {
+  public void start() {
+    LOG.info("********* Initializing AmbariServer Metrics Service **********");
     try {
-      configuration = new Configuration();
-      if (collectorUri.isEmpty() || collectorProtocol.isEmpty()) {
-        setCollectorUri();
+      configuration = MetricsConfiguration.getMetricsConfiguration();
+      if (configuration == null) {
+        return;
+      }
+      sink = new AmbariMetricSinkImpl(amc);
+      initializeMetricsSink();
+      initializeMetricSources();
+
+      if (!sink.isInitialized()) {
+        //If Sink is not initialized, Service will check for every 5 mins.
+        Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new Runnable() {
+          @Override
+          public void run() {
+            LOG.info("Checking for metrics sink initialization");
+            if (!sink.isInitialized()) {
+              initializeMetricsSink();
+            }
+          }
+        }, 5, 5, TimeUnit.MINUTES);
       }
-      configureSourceAndSink();
     } catch (Exception e) {
-      LOG.info("Error initializing MetricsService", e);
-    }
-
-  }
-  @Override
-  public void run() {
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-    for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
-      publishMetrics(executor, entry);
+      LOG.info("Unable to initialize MetricsService : ", e.getMessage());
     }
   }
 
+  private void initializeMetricsSink() {
 
-  private void setCollectorUri() {
-    InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin");
-    authenticationToken.setAuthenticated(true);
-    SecurityContextHolder.getContext().setAuthentication(authenticationToken);
-    Clusters clusters = amc.getClusters();
-    for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) {
-      String clusterName = kv.getKey();
-      Resource.Type type = Resource.Type.ServiceConfigVersion;
-
-      Set<String> propertyIds = new HashSet<String>();
-      propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
-
-      Predicate predicate = new PredicateBuilder().property(
-        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property(
-        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals("AMBARI_METRICS").and().property(
-        ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate();
-
-      Request request = PropertyHelper.getReadRequest(propertyIds);
-
-      ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
-        type,
-        PropertyHelper.getPropertyIds(type),
-        PropertyHelper.getKeyPropertyIds(type),
-        amc);
-
-      try {
-        Set<Resource> resources = provider.getResources(request, predicate);
-
-        // get collector uri
-        for (Resource resource : resources) {
-          if (resource != null) {
-            ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>)
-              resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
-            for (LinkedHashMap<Object, Object> config : configs) {
-              if (config != null && config.get("type").equals("ams-site")) {
-                TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties");
-                collectorUri = (String) properties.get("timeline.metrics.service.webapp.address");
-                String which_protocol = (String) properties.get("timeline.metrics.service.http.policy");
-                collectorProtocol = which_protocol.equals("HTTP_ONLY") ? "http" : "https";
-                break;
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOG.info("Throwing exception when retrieving Collector URI", e);
-      }
-    }
+    LOG.info("********* Configuring Metric Sink **********");
+    sink.init(configuration);
   }
 
-  private void configureSourceAndSink() {
+  private void initializeMetricSources() {
     try {
-      LOG.info("********* Configuring Ambari Metrics Sink and Source**********");
-      int frequency = Integer.parseInt(configuration.getProperty("sink.frequency", "10")); // default value 10
-      sink.init(collectorProtocol, collectorUri, frequency);
-      String[] sourceNames = configuration.getProperty("metrics.sources").split(",");
+
+      LOG.info("********* Configuring Metric Sources **********");
+      String commaSeparatedSources = configuration.getProperty("metric.sources");
+
+      if (StringUtils.isEmpty(commaSeparatedSources)) {
+        LOG.info("No sources configured.");
+        return;
+      }
+
+      String[] sourceNames = commaSeparatedSources.split(",");
       for (String sourceName: sourceNames) {
         String className = configuration.getProperty("source." + sourceName + ".class");
         Class t = Class.forName(className);
         AbstractMetricsSource src = (AbstractMetricsSource)t.newInstance();
-        src.init(sink);
+        src.init(configuration, sink);
         sources.put(sourceName, src);
       }
-    }
-    catch (Exception e) {
-      LOG.info("Throwing exception when registering metric sink and source", e);
+
+      final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+      for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
+        startSource(executor, entry);
+      }
+    } catch (Exception e) {
+      LOG.error("Error when configuring metric sink and source", e);
     }
   }
 
-  private void publishMetrics(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
+  private void startSource(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
     String className = entry.getKey();
     AbstractMetricsSource source = entry.getValue();
     String interval = "source." + className + ".interval";
-    int duration = Integer.parseInt(configuration.getProperty(interval, "5")); // default value 5
+    int duration = Integer.parseInt(configuration.getProperty(interval, "10")); // default value 10 seconds
     try {
       executor.scheduleWithFixedDelay(source, 0, duration, TimeUnit.SECONDS);
-
     } catch (Exception e) {
-      LOG.info("Throwing exception when failing scheduling source", e);
+      LOG.info("Throwing exception when starting metric source", e);
     }
   }
+
+  public Collection<AbstractMetricsSource> getSources() {
+    return sources.values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
new file mode 100644
index 0000000..a6aa5d5
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.junit.Test;
+
+public class JvmMetricsSourceTest {
+
+  @Test
+  public void testJvmSourceInit() {
+    JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
+    MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
+    MetricsSink sink = new TestAmbariMetricsSinkImpl();
+    jvmMetricsSource.init(configuration, sink);
+    org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), 39);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
new file mode 100644
index 0000000..67d80ee
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import org.apache.ambari.server.metrics.system.MetricsService;
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class MetricsServiceTest {
+
+  @Test
+  public void testMetricsServiceStart() {
+    MetricsService metricsService = new MetricsServiceImpl();
+    metricsService.start();
+    Assert.assertTrue(metricsService.getSources().size() == 2);
+    for (AbstractMetricsSource source : metricsService.getSources()) {
+      Assert.assertTrue ( source instanceof JvmMetricsSource || source instanceof TestMetricsSource);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
new file mode 100644
index 0000000..c4ba97e
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+
+public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
+
+  @Override
+  public void publish(List<SingleMetric> metrics) {
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  protected String getCollectorUri(String host) {
+    return constructContainerMetricUri(getCollectorProtocol(), host, getCollectorPort());
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return "http";
+  }
+
+  @Override
+  protected String getCollectorPort() {
+    return "6188";
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    return 1000;
+  }
+
+  @Override
+  protected String getZookeeperQuorum() {
+    return null;
+  }
+
+  @Override
+  protected Collection<String> getConfiguredCollectorHosts() {
+    return Collections.singletonList("localhost");
+  }
+
+  @Override
+  protected String getHostname() {
+    return "localhost";
+  }
+
+  @Override
+  public void init(MetricsConfiguration configuration) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
new file mode 100644
index 0000000..c6ce90e
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+
+public class TestMetricsSource extends AbstractMetricsSource {
+
+  @Override
+  public List<SingleMetric> getMetrics() {
+    return null;
+  }
+
+  @Override
+  public void run() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/resources/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/metrics.properties b/ambari-server/src/test/resources/metrics.properties
new file mode 100644
index 0000000..5eee064
--- /dev/null
+++ b/ambari-server/src/test/resources/metrics.properties
@@ -0,0 +1,29 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#### Source Configs #####
+# Source interval determines how often the metric is sent to sink. Its unit is in seconds
+metric.sources=jvm,testsource
+
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.testsource.class=org.apache.ambari.server.metric.system.impl.TestMetricsSource
+
+#### Sink Configs #####
+# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
\ No newline at end of file