You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:22 UTC

[06/30] ambari git commit: AMBARI-5707. Replace Ganglia with high performant and pluggable Metrics System. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
new file mode 100644
index 0000000..63533c6
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics.ganglia;
+
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
+import org.apache.ambari.server.controller.spi.*;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.*;
+
+/**
+ * Abstract property provider implementation for a Ganglia source.
+ */
+public abstract class GangliaPropertyProvider extends MetricsPropertyProvider {
+
+  /**
+   * Map of Ganglia cluster names keyed by component type.
+   */
+  static final Map<String, List<String>> GANGLIA_CLUSTER_NAME_MAP = new HashMap<String, List<String>>();
+
+  
+  static {
+    GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE",           Collections.singletonList("HDPNameNode"));
+    GANGLIA_CLUSTER_NAME_MAP.put("DATANODE",           Arrays.asList("HDPDataNode", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER",         Collections.singletonList("HDPJobTracker"));
+    GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER",        Arrays.asList("HDPTaskTracker", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("RESOURCEMANAGER",    Collections.singletonList("HDPResourceManager"));
+    GANGLIA_CLUSTER_NAME_MAP.put("NODEMANAGER",        Arrays.asList("HDPNodeManager", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("HISTORYSERVER",      Collections.singletonList("HDPHistoryServer"));
+    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER",       Collections.singletonList("HDPHBaseMaster"));
+    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_REGIONSERVER", Arrays.asList("HDPHBaseRegionServer", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("FLUME_HANDLER",      Arrays.asList("HDPFlumeServer", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("JOURNALNODE",        Arrays.asList("HDPJournalNode", "HDPSlaves"));
+    GANGLIA_CLUSTER_NAME_MAP.put("NIMBUS",             Collections.singletonList("HDPNimbus"));
+    GANGLIA_CLUSTER_NAME_MAP.put("SUPERVISOR",         Collections.singletonList("HDPSupervisor"));
+  }
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(GangliaPropertyProvider.class);
+
+  // ----- Constructors ------------------------------------------------------
+
+  public GangliaPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                 StreamProvider streamProvider,
+                                 ComponentSSLConfiguration configuration,
+                                 MetricHostProvider hostProvider,
+                                 String clusterNamePropertyId,
+                                 String hostNamePropertyId,
+                                 String componentNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider,configuration,
+      hostProvider, clusterNamePropertyId, hostNamePropertyId,
+      componentNamePropertyId);
+  }
+
+
+  // ----- PropertyProvider --------------------------------------------------
+
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
+      throws SystemException {
+
+    Set<String> ids = getRequestPropertyIds(request, predicate);
+    if (ids.isEmpty()) {
+      return resources;
+    }
+
+    Map<String, Map<TemporalInfo, RRDRequest>> requestMap = getRRDRequests(resources, request, ids);
+
+    // For each cluster...
+    for (Map.Entry<String, Map<TemporalInfo, RRDRequest>> clusterEntry : requestMap.entrySet()) {
+      // For each request ...
+      for (RRDRequest rrdRequest : clusterEntry.getValue().values() ) {
+        //todo: property provider can reduce set of resources
+        rrdRequest.populateResources();
+      }
+    }
+    return resources;
+  }
+
+
+  // ----- GangliaPropertyProvider -------------------------------------------
+
+
+  /**
+   * Get the ganglia cluster name for the given resource.
+   *
+   *
+   * @param resource  the resource
+   *
+   * @return the ganglia cluster name
+   */
+  protected abstract Set<String> getGangliaClusterNames(Resource resource, String clusterName);
+
+
+  /**
+   * Get the component name property id.
+   *
+   * @return the component name property id
+   */
+  protected String getComponentNamePropertyId() {
+    return componentNamePropertyId;
+  }
+
+  /**
+   * Get the host name property id.
+   *
+   * @return the host name property id
+   */
+  protected String getHostNamePropertyId() {
+    return hostNamePropertyId;
+  }
+
+  /**
+   * Get the stream provider.
+   *
+   * @return the stream provider
+   */
+  public StreamProvider getStreamProvider() {
+    return streamProvider;
+  }
+
+
+  // ----- helper methods ----------------------------------------------------
+
+  /**
+   * Get the request objects containing all the information required to
+   * make single requests to the Ganglia rrd script.
+   * Requests are created per cluster name / temporal information but
+   * can span multiple resources and metrics.
+   *
+   * @param resources  the resources being populated
+   * @param request    the request
+   * @param ids        the relevant property ids
+   *
+   * @return a map of maps of rrd requests keyed by cluster name / temporal info
+   */
+  private Map<String, Map<TemporalInfo, RRDRequest>> getRRDRequests(Set<Resource> resources,
+                                                                    Request request,
+                                                                    Set<String> ids) {
+
+    Map<String, Map<TemporalInfo, RRDRequest>> requestMap =
+        new HashMap<String, Map<TemporalInfo, RRDRequest>>();
+
+    for (Resource resource : resources) {
+      String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+      Map<TemporalInfo, RRDRequest> requests = requestMap.get(clusterName);
+      if (requests == null) {
+        requests = new HashMap<TemporalInfo, RRDRequest>();
+        requestMap.put(clusterName, requests);
+      }
+
+      Set<String> gangliaClusterNames = getGangliaClusterNames(resource, clusterName);
+
+      for (String gangliaClusterName : gangliaClusterNames) {
+        ResourceKey key =
+            new ResourceKey(getHostName(resource), gangliaClusterName);
+
+        for (String id : ids) {
+          Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+          Map<String, PropertyInfo> componentMetricMap =
+            getComponentMetrics().get(getComponentName(resource));
+
+          // Not all components have metrics
+          if (componentMetricMap != null &&
+              !componentMetricMap.containsKey(id)) {
+            updateComponentMetricMap(componentMetricMap, id);
+          }
+
+          getPropertyInfoMap(getComponentName(resource), id, propertyInfoMap);
+
+          for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+            String propertyId = entry.getKey();
+            PropertyInfo propertyInfo = entry.getValue();
+
+            TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+            if ((temporalInfo == null && propertyInfo.isPointInTime()) || (temporalInfo != null && propertyInfo.isTemporal())) {
+              RRDRequest rrdRequest = requests.get(temporalInfo);
+              if (rrdRequest == null) {
+                rrdRequest = new RRDRequest(clusterName, temporalInfo);
+                requests.put(temporalInfo, rrdRequest);
+              }
+              rrdRequest.putResource(key, resource);              
+              rrdRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+            }
+          }
+        }
+      }
+    }
+    return requestMap;
+  }
+
+  /**
+   * Get the spec to locate the Ganglia stream from the given
+   * request info.
+   *
+   * @param clusterName   the cluster name
+   * @param clusterSet    the set of ganglia cluster names
+   * @param hostSet       the set of host names
+   * @param metricSet     the set of metric names
+   * @param temporalInfo  the temporal information
+   *
+   * @return the spec, like http://example.com/path?param1=val1&paramn=valn
+   *
+   * @throws org.apache.ambari.server.controller.spi.SystemException if unable to get the Ganglia Collector host name
+   */
+  private String getSpec(String clusterName,
+                         Set<String> clusterSet,
+                         Set<String> hostSet,
+                         Set<String> metricSet,
+                         TemporalInfo temporalInfo) throws SystemException {
+    
+    String clusters = getSetString(clusterSet, -1);
+    String hosts = getSetString(hostSet, -1);
+    String metrics = getSetString(metricSet, -1);
+    
+    URIBuilder uriBuilder = new URIBuilder();
+
+    if (configuration.isGangliaSSL()) {
+      uriBuilder.setScheme("https");
+    } else {
+      uriBuilder.setScheme("http");
+    }
+
+    uriBuilder.setHost(hostProvider.getCollectorHostName(clusterName, GANGLIA));
+
+    uriBuilder.setPath("/cgi-bin/rrd.py");
+    
+    uriBuilder.setParameter("c", clusters);
+
+    if (hosts.length() > 0) {
+      uriBuilder.setParameter("h", hosts);
+    }
+
+    if (metrics.length() > 0) {
+      uriBuilder.setParameter("m", metrics);
+    } else {
+      // get all metrics
+      uriBuilder.setParameter("m", ".*");
+    }
+
+    if (temporalInfo != null) {
+      long startTime = temporalInfo.getStartTime();
+      if (startTime != -1) {
+        uriBuilder.setParameter("s", String.valueOf(startTime));
+      }
+
+      long endTime = temporalInfo.getEndTime();
+      if (endTime != -1) {
+        uriBuilder.setParameter("e", String.valueOf(endTime));
+      }
+
+      long step = temporalInfo.getStep();
+      if (step != -1) {
+        uriBuilder.setParameter("r", String.valueOf(step));
+      }
+    } else {
+      uriBuilder.setParameter("e", "now");
+      uriBuilder.setParameter("pt", "true");
+    }
+
+    return uriBuilder.toString();
+  }
+  
+
+  /**
+   * Get value from the given metric.
+   *
+   * @param metric      the metric
+   * @param isTemporal  indicates whether or not this a temporal metric
+   *
+   * @return a range of temporal data or a point in time value if not temporal
+   */
+  private static Object getValue(GangliaMetric metric, boolean isTemporal) {
+    Number[][] dataPoints = metric.getDatapoints();
+
+    int length = dataPoints.length;
+    if (isTemporal) {
+      return length > 0 ? dataPoints : null;
+    } else {
+      // return the value of the last data point
+      return length > 0 ? dataPoints[length - 1][0] : 0;
+    }
+  }
+
+  // ----- inner classes -----------------------------------------------------
+
+
+  // ----- RRDRequest ----------------------------------------------------
+
+  /**
+   * The information required to make a single RRD request.
+   */
+  private class RRDRequest {
+    private static final int POPULATION_TIME_UPPER_LIMIT = 5;
+    private final String clusterName;
+    private final TemporalInfo temporalInfo;
+    private final Map<ResourceKey, Set<Resource>> resources = new HashMap<ResourceKey, Set<Resource>>();
+    private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
+    private final Set<String> clusterSet = new HashSet<String>();
+    private final Set<String> hostSet = new HashSet<String>();
+
+
+    private RRDRequest(String clusterName, TemporalInfo temporalInfo) {
+      this.clusterName  = clusterName;
+      this.temporalInfo = temporalInfo;
+    }
+
+    public void putResource(ResourceKey key, Resource resource) {
+      clusterSet.add(key.getClusterName());
+      hostSet.add(key.getHostName());
+      Set<Resource> resourceSet = resources.get(key);
+      if (resourceSet == null) {
+        resourceSet = new HashSet<Resource>();
+        resources.put(key, resourceSet);
+      }
+      resourceSet.add(resource);
+    }
+
+    public void putPropertyId(String metric, String id) {
+      Set<String> propertyIds = metrics.get(metric);
+
+      if (propertyIds == null) {
+        propertyIds = new HashSet<String>();
+        metrics.put(metric, propertyIds);
+      }
+      propertyIds.add(id);
+    }
+
+    /**
+     * Populate the associated resources by making the rrd request.
+     *
+     * @return a collection of populated resources
+     *
+     * @throws org.apache.ambari.server.controller.spi.SystemException if unable to populate the resources
+     */
+    public Collection<Resource> populateResources() throws SystemException {
+
+      //Get full url with parameters
+      String specWithParams = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
+      
+      //URL
+      String spec = null;
+      //Parameters
+      String params = null;
+      
+      String[] tokens = questionMarkPattern.split(specWithParams, 2);
+
+      try {
+        spec = tokens[0];
+        params = tokens[1];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        LOG.info(e.toString());
+      }
+      
+
+      BufferedReader reader = null;
+      try {
+        
+        //Check if host is live
+        if (!hostProvider.isCollectorHostLive(clusterName, GANGLIA)) {
+          LOG.info("Ganglia host is not live");
+          return Collections.emptySet();
+        }
+
+        //Check if Ganglia server component is live
+        if (!hostProvider.isCollectorComponentLive(clusterName, GANGLIA)) {
+          LOG.info("Ganglia server component is not live");
+          return Collections.emptySet();
+        }
+
+        reader = new BufferedReader(new InputStreamReader(
+            getStreamProvider().readFrom(spec, "POST", params)));
+
+        String feedStart = reader.readLine();
+        if (feedStart == null || feedStart.isEmpty()) {
+          LOG.info("Empty feed while getting ganglia metrics for spec => "+
+            spec);
+          return Collections.emptySet();
+        }
+        int startTime = convertToNumber(feedStart).intValue();
+
+        String dsName = reader.readLine();
+        if (dsName == null || dsName.isEmpty()) {
+          LOG.info("Feed without body while reading ganglia metrics for spec " +
+            "=> " + spec);
+          return Collections.emptySet();
+        }
+
+        while(!"[~EOF]".equals(dsName)) {
+          GangliaMetric metric = new GangliaMetric();
+          List<GangliaMetric.TemporalMetric> listTemporalMetrics =
+              new ArrayList<GangliaMetric.TemporalMetric>();
+
+          metric.setDs_name(dsName);
+          metric.setCluster_name(reader.readLine());
+          metric.setHost_name(reader.readLine());
+          metric.setMetric_name(reader.readLine());
+
+          String timeStr = reader.readLine();
+          String stepStr = reader.readLine();
+          if (timeStr == null || timeStr.isEmpty() || stepStr == null
+              || stepStr.isEmpty()) {
+            LOG.info("Unexpected end of stream reached while getting ganglia " +
+                "metrics for spec => " + spec);
+            return Collections.emptySet();
+          }
+          int time = convertToNumber(timeStr).intValue();
+          int step = convertToNumber(stepStr).intValue();
+
+          String val     = reader.readLine();
+          String lastVal = null;
+
+          while(val!=null && !"[~EOM]".equals(val)) {
+            if (val.startsWith("[~r]")) {
+              Integer repeat = Integer.valueOf(val.substring(4)) - 1;
+              for (int i = 0; i < repeat; ++i) {
+                if (! "[~n]".equals(lastVal)) {
+                  GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(lastVal, time);
+                  if (tm.isValid()) listTemporalMetrics.add(tm);
+                }
+                time += step;
+              }
+            } else {
+              if (! "[~n]".equals(val)) {
+                GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(val, time);
+                if (tm.isValid()) listTemporalMetrics.add(tm);
+              }
+              time += step;
+            }
+            lastVal = val;
+            val = reader.readLine();
+          }
+
+          metric.setDatapointsFromList(listTemporalMetrics);
+
+          ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
+          Set<Resource> resourceSet = resources.get(key);
+          if (resourceSet != null) {
+            for (Resource resource : resourceSet) {
+              populateResource(resource, metric);
+            }
+          }
+
+          dsName = reader.readLine();
+          if (dsName == null || dsName.isEmpty()) {
+            LOG.info("Unexpected end of stream reached while getting ganglia " +
+              "metrics for spec => " + spec);
+            return Collections.emptySet();
+          }
+        }
+        String feedEnd = reader.readLine();
+        if (feedEnd == null || feedEnd.isEmpty()) {
+          LOG.info("Error reading end of feed while getting ganglia metrics " +
+            "for spec => " + spec);
+        } else {
+
+          int endTime = convertToNumber(feedEnd).intValue();
+          int totalTime = endTime - startTime;
+          if (LOG.isInfoEnabled() && totalTime > POPULATION_TIME_UPPER_LIMIT) {
+            LOG.info("Ganglia resource population time: " + totalTime);
+          }
+        }
+      } catch (IOException e) {
+        if (LOG.isErrorEnabled()) {
+          LOG.error("Caught exception getting Ganglia metrics : spec=" + spec);
+        }
+      } finally {
+        if (reader != null) {
+          try {
+            reader.close();
+          } catch (IOException e) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Unable to close http input steam : spec=" + spec, e);
+            }
+          }
+        }
+      }
+      //todo: filter out resources and return keepers
+      return Collections.emptySet();
+    }
+
+
+    /**
+     * Populate the given resource with the given Ganglia metric.
+     *
+     * @param resource       the resource
+     * @param gangliaMetric  the Ganglia metrics
+     */
+    private void populateResource(Resource resource, GangliaMetric gangliaMetric) {
+      String metric_name = gangliaMetric.getMetric_name();
+
+      Set<String> propertyIdSet = metrics.get(metric_name);
+      List<String> parameterList  = new LinkedList<String>();
+
+      if (propertyIdSet == null) {
+        for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {
+
+          String key = entry.getKey();
+
+          Pattern pattern = Pattern.compile(key);
+          Matcher matcher = pattern.matcher(metric_name);
+
+          if (matcher.matches()) {
+            propertyIdSet = entry.getValue();
+            // get parameters
+            for (int i = 0; i < matcher.groupCount(); ++i) {
+              parameterList.add(matcher.group(i + 1));
+            }
+            break;
+          }
+        }
+      }
+      if (propertyIdSet != null) {
+        Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getComponentName(resource));
+        if (metricsMap != null) {
+          for (String propertyId : propertyIdSet) {
+            if (propertyId != null) {
+              if (metricsMap.containsKey(propertyId)){
+                if (containsArguments(propertyId)) {
+                  int i = 1;
+                  for (String param : parameterList) {
+                    propertyId = substituteArgument(propertyId, "$" + i, param);
+                    ++i;
+                  }
+                }
+                Object value = getValue(gangliaMetric, temporalInfo != null);
+                if (value != null) {
+                  resource.setProperty(propertyId, value);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+
+    private Number convertToNumber(String s) {
+      return s.contains(".") ? Double.parseDouble(s) : Long.parseLong(s);
+    }
+  }
+
+
+  // ----- ResourceKey ---------------------------------------------------
+
+  /**
+   * Key used to associate information from a Ganglia metric to a resource.
+   */
+  private static class ResourceKey {
+    private final String hostName;
+    private final String gangliaClusterName;
+
+    private ResourceKey(String hostName, String gangliaClusterName) {
+      this.hostName           = hostName;
+      this.gangliaClusterName = gangliaClusterName;
+    }
+
+    public String getHostName() {
+      return hostName;
+    }
+
+    public String getClusterName() {
+      return gangliaClusterName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ResourceKey that = (ResourceKey) o;
+
+      return
+          !(gangliaClusterName != null ? !gangliaClusterName.equals(that.gangliaClusterName) : that.gangliaClusterName != null) &&
+          !(hostName != null ? !hostName.equals(that.hostName) : that.hostName != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = hostName != null ? hostName.hashCode() : 0;
+      result = 31 * result + (gangliaClusterName != null ? gangliaClusterName.hashCode() : 0);
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
new file mode 100644
index 0000000..09ea31c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics.ganglia;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.GANGLIA;
+
+/**
+ * Property provider implementation for a Ganglia source. This provider is specialized
+ * to pull metrics from existing Ganglia reports.
+ */
+public class GangliaReportPropertyProvider extends MetricsReportPropertyProvider {
+  // ----- Constants --------------------------------------------------------
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(GangliaReportPropertyProvider.class);
+
+
+  // ----- Constructors ------------------------------------------------------
+
+  public GangliaReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                       StreamProvider streamProvider,
+                                       ComponentSSLConfiguration configuration,
+                                       MetricHostProvider hostProvider,
+                                       String clusterNamePropertyId) {
+    super(componentPropertyInfoMap, streamProvider, configuration,
+      hostProvider, clusterNamePropertyId);
+  }
+
+
+  // ----- PropertyProvider --------------------------------------------------
+
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
+      throws SystemException {
+
+    Set<Resource> keepers = new HashSet<Resource>();
+    for (Resource resource : resources) {
+      if (populateResource(resource, request, predicate)) {
+        keepers.add(resource);
+      }
+    }
+    return keepers;
+  }
+
+
+  // ----- helper methods ----------------------------------------------------
+
+  /**
+   * Populate a resource by obtaining the requested Ganglia RESOURCE_METRICS.
+   *
+   * @param resource  the resource to be populated
+   * @param request   the request
+   * @param predicate the predicate
+   *
+   * @return true if the resource was successfully populated with the requested properties
+   *
+   * @throws SystemException if unable to populate the resource
+   */
+  private boolean populateResource(Resource resource, Request request, Predicate predicate)
+      throws SystemException {
+
+    Set<String> propertyIds = getPropertyIds();
+
+    if (propertyIds.isEmpty()) {
+      return true;
+    }
+    String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+
+    if (hostProvider.getCollectorHostName(clusterName, GANGLIA) == null) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Attempting to get metrics but the Ganglia server is unknown. Resource=" + resource +
+            " : Cluster=" + clusterName);
+      }
+      return true;
+    }
+
+    setProperties(resource, clusterName, request, getRequestPropertyIds(request, predicate));
+
+    return true;
+  }
+
+  private boolean setProperties(Resource resource, String clusterName, Request request, Set<String> ids)
+      throws SystemException {
+
+    Map<String, Map<String, String>> propertyIdMaps = getPropertyIdMaps(request, ids);
+
+    for (Map.Entry<String, Map<String, String>> entry : propertyIdMaps.entrySet()) {
+      Map<String, String>  map = entry.getValue();
+      String report = entry.getKey();
+
+      String spec = getSpec(clusterName, report);
+
+      try {
+        List<GangliaMetric> gangliaMetrics = new ObjectMapper().readValue(streamProvider.readFrom(spec),
+            new TypeReference<List<GangliaMetric>>() {});
+
+        if (gangliaMetrics != null) {
+          for (GangliaMetric gangliaMetric : gangliaMetrics) {
+
+            String propertyId = map.get(gangliaMetric.getMetric_name());
+            if (propertyId != null) {
+              resource.setProperty(propertyId, getValue(gangliaMetric));
+            }
+          }
+        }
+      } catch (IOException e) {
+        if (LOG.isErrorEnabled()) {
+          LOG.error("Caught exception getting Ganglia metrics : " + e + " : spec=" + spec);
+        }
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private Map<String, Map<String, String>> getPropertyIdMaps(Request request, Set<String> ids) {
+    Map<String, Map<String, String>> propertyMap = new HashMap<String, Map<String, String>>();
+
+    for (String id : ids) {
+      Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap("*", id);
+
+      for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+        String propertyId = entry.getKey();
+        PropertyInfo propertyInfo = entry.getValue();
+
+        TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+        if (temporalInfo != null && propertyInfo.isTemporal()) {
+          String propertyName = propertyInfo.getPropertyId();
+          String report = null;
+          // format : report_name.metric_name
+          int dotIndex = propertyName.lastIndexOf('.');
+          if (dotIndex != -1){
+            report = propertyName.substring(0, dotIndex);
+            propertyName = propertyName.substring(dotIndex + 1);
+          }
+          if (report !=  null) {
+            Map<String, String> map = propertyMap.get(report);
+            if (map == null) {
+              map = new HashMap<String, String>();
+              propertyMap.put(report, map);
+            }
+            map.put(propertyName, propertyId);
+          }
+        }
+      }
+    }
+    return propertyMap;
+  }
+
+  /**
+   * Get value from the given metric.
+   *
+   * @param metric     the metric
+   */
+  private Object getValue(GangliaMetric metric) {
+      return metric.getDatapoints();
+  }
+
+  /**
+   * Get the spec to locate the Ganglia stream from the given
+   * request info.
+   *
+   *
+   * @param clusterName     the cluster name
+   * @param report          the report
+   *
+   * @return the spec
+   *
+   * @throws SystemException if unable to ge the Ganglia Collector host name
+   */
+  protected String getSpec(String clusterName, String report) throws SystemException {
+
+    StringBuilder sb = new StringBuilder();
+
+    if (configuration.isGangliaSSL()) {
+      sb.append("https://");
+    } else {
+      sb.append("http://");
+    }
+
+    sb.append(hostProvider.getCollectorHostName(clusterName, GANGLIA)).
+        append("/ganglia/graph.php?g=").
+        append(report).
+        append("&json=1");
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
new file mode 100644
index 0000000..944ec5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+
+public class AMSComponentPropertyProvider extends AMSPropertyProvider {
+
+  public AMSComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                          StreamProvider streamProvider,
+                                          ComponentSSLConfiguration configuration,
+                                          MetricHostProvider hostProvider,
+                                          String clusterNamePropertyId,
+                                          String componentNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+      clusterNamePropertyId, null, componentNamePropertyId);
+  }
+
+  @Override
+  protected String getHostName(Resource resource) {
+    return null;
+  }
+
+  @Override
+  protected String getComponentName(Resource resource) {
+    String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
+
+    if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+      componentName = TIMLINE_APPID_MAP.get(componentName);
+    }
+
+    return componentName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
new file mode 100644
index 0000000..09836e3
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+
+public class AMSHostComponentPropertyProvider extends AMSPropertyProvider {
+
+  public AMSHostComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                 StreamProvider streamProvider,
+                                 ComponentSSLConfiguration configuration,
+                                 MetricHostProvider hostProvider,
+                                 String clusterNamePropertyId,
+                                 String hostNamePropertyId,
+                                 String componentNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+      clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId);
+  }
+
+  @Override
+  protected String getHostName(Resource resource) {
+    return (String) resource.getPropertyValue(hostNamePropertyId);
+  }
+
+  @Override
+  protected String getComponentName(Resource resource) {
+    String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
+
+    if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+      componentName = TIMLINE_APPID_MAP.get(componentName);
+    }
+
+    return componentName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
new file mode 100644
index 0000000..ca9d685
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+
+import java.util.Map;
+
+public class AMSHostPropertyProvider extends AMSPropertyProvider {
+
+  public AMSHostPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                 StreamProvider streamProvider,
+                                 ComponentSSLConfiguration configuration,
+                                 MetricHostProvider hostProvider,
+                                 String clusterNamePropertyId,
+                                 String hostNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+      clusterNamePropertyId, hostNamePropertyId, null);
+  }
+
+  @Override
+  protected String getHostName(Resource resource) {
+    return (String) resource.getPropertyValue(hostNamePropertyId);
+  }
+
+  @Override
+  protected String getComponentName(Resource resource) {
+    return "HOST";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
new file mode 100644
index 0000000..d75c982
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.utils.URIBuilder;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.TIMELINE_METRICS;
+import static org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
+  static final Map<String, String> TIMLINE_APPID_MAP = new HashMap<String, String>();
+  private static ObjectMapper mapper;
+  //private final HttpClient httpClient = new HttpClient();
+  private final static ObjectReader timelineObjectReader;
+  private static final DecimalFormat decimalFormat = new DecimalFormat("#.00");
+
+  private static final Set<String> PERCENTAGE_METRIC;
+
+  static {
+    TIMLINE_APPID_MAP.put("HBASE_MASTER", "HBASE");
+    TIMLINE_APPID_MAP.put("HBASE_REGIONSERVER", "HBASE");
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    //noinspection deprecation
+    mapper.getSerializationConfig().setSerializationInclusion(Inclusion.NON_NULL);
+    timelineObjectReader = mapper.reader(TimelineMetrics.class);
+
+    Set<String> temp = new HashSet<String>();
+    temp.add("cpu_wio");
+    temp.add("cpu_idle");
+    temp.add("cpu_nice");
+    temp.add("cpu_aidle");
+    temp.add("cpu_system");
+    temp.add("cpu_user");
+    PERCENTAGE_METRIC = Collections.unmodifiableSet(temp);
+  }
+
+  public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                             StreamProvider streamProvider,
+                             ComponentSSLConfiguration configuration,
+                             MetricHostProvider hostProvider,
+                             String clusterNamePropertyId,
+                             String hostNamePropertyId,
+                             String componentNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider, configuration,
+      hostProvider, clusterNamePropertyId, hostNamePropertyId,
+      componentNamePropertyId);
+  }
+
+  protected String getOverridenComponentName(Resource resource) {
+    String componentName = getComponentName(resource);
+    // Hack: To allow host queries to succeed
+    if (componentName.equals("HOST")) {
+      return  "*";
+    }
+    return componentName;
+  }
+
+  /**
+   * The information required to make a single call to the Metrics service.
+   */
+  class MetricsRequest {
+    private final TemporalInfo temporalInfo;
+    private final Map<String, Set<Resource>> resources = new HashMap<String, Set<Resource>>();
+    private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
+    private final URIBuilder uriBuilder;
+    private final String dummyHostName = "__SummaryInfo__";
+
+    private MetricsRequest(TemporalInfo temporalInfo, URIBuilder uriBuilder) {
+      this.temporalInfo = temporalInfo;
+      this.uriBuilder = uriBuilder;
+    }
+
+    public void putResource(String hostname, Resource resource) {
+      if (hostname == null) {
+        hostname = dummyHostName;
+      }
+      Set<Resource> resourceSet = resources.get(hostname);
+      if (resourceSet == null) {
+        resourceSet = new HashSet<Resource>();
+        resources.put(hostname, resourceSet);
+      }
+      resourceSet.add(resource);
+    }
+
+    public void putPropertyId(String metric, String id) {
+      Set<String> propertyIds = metrics.get(metric);
+
+      if (propertyIds == null) {
+        propertyIds = new HashSet<String>();
+        metrics.put(metric, propertyIds);
+      }
+      propertyIds.add(id);
+    }
+
+    /**
+     * Populate the associated resources by making a call to the Metrics
+     * service.
+     *
+     * @return a collection of populated resources
+     * @throws SystemException if unable to populate the resources
+     */
+    public Collection<Resource> populateResources() throws SystemException {
+      // No open ended query support.
+      if (temporalInfo == null || temporalInfo.getStartTime() == null ||
+          temporalInfo.getEndTime() == null) {
+        return Collections.emptySet();
+      }
+
+      for (Map.Entry<String, Set<Resource>> resourceEntry : resources.entrySet()) {
+        String hostname = resourceEntry.getKey();
+        Set<Resource> resourceSet = resourceEntry.getValue();
+
+        for (Resource resource : resourceSet) {
+          String metricsParam = getSetString(metrics.keySet(), -1);
+          // Reuse uriBuilder
+          uriBuilder.removeQuery();
+
+          if (metricsParam.length() > 0) {
+            uriBuilder.setParameter("metricNames", metricsParam);
+          }
+
+          if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
+            uriBuilder.setParameter("hostname", hostname);
+          }
+
+          String componentName = getComponentName(resource);
+          if (componentName != null && !componentName.isEmpty()) {
+            if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+              componentName = TIMLINE_APPID_MAP.get(componentName);
+            }
+            uriBuilder.setParameter("appId", componentName);
+          }
+
+          long startTime = temporalInfo.getStartTime();
+          if (startTime != -1) {
+            uriBuilder.setParameter("startTime", String.valueOf(startTime));
+          }
+
+          long endTime = temporalInfo.getEndTime();
+          if (endTime != -1) {
+            uriBuilder.setParameter("endTime", String.valueOf(endTime));
+          }
+
+          BufferedReader reader = null;
+          String spec = uriBuilder.toString();
+          try {
+            LOG.debug("Metrics request url =" + spec);
+            reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
+
+            TimelineMetrics timelineMetrics = timelineObjectReader.readValue(reader);
+            LOG.debug("Timeline metrics response => " + timelineMetrics);
+
+            for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+              if (metric.getMetricName() != null && metric.getMetricValues() != null) {
+                populateResource(resource, metric);
+              }
+            }
+
+          } catch (IOException io) {
+            LOG.warn("Error getting timeline metrics.", io);
+          } finally {
+            if (reader != null) {
+              try {
+                reader.close();
+              } catch (IOException e) {
+                if (LOG.isWarnEnabled()) {
+                  LOG.warn("Unable to close http input steam : spec=" + spec, e);
+                }
+              }
+            }
+          }
+        }
+      }
+
+      return Collections.emptySet();
+    }
+
+    private void populateResource(Resource resource, TimelineMetric metric) {
+      String metric_name = metric.getMetricName();
+      Set<String> propertyIdSet = metrics.get(metric_name);
+      List<String> parameterList  = new LinkedList<String>();
+
+      if (propertyIdSet == null) {
+        for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {
+          String key = entry.getKey();
+          Pattern pattern = Pattern.compile(key);
+          Matcher matcher = pattern.matcher(metric_name);
+
+          if (matcher.matches()) {
+            propertyIdSet = entry.getValue();
+            // get parameters
+            for (int i = 0; i < matcher.groupCount(); ++i) {
+              parameterList.add(matcher.group(i + 1));
+            }
+            break;
+          }
+        }
+      }
+      if (propertyIdSet != null) {
+        Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getOverridenComponentName(resource));
+        if (metricsMap != null) {
+          for (String propertyId : propertyIdSet) {
+            if (propertyId != null) {
+              if (metricsMap.containsKey(propertyId)){
+                if (containsArguments(propertyId)) {
+                  int i = 1;
+                  for (String param : parameterList) {
+                    propertyId = substituteArgument(propertyId, "$" + i, param);
+                    ++i;
+                  }
+                }
+                Object value = getValue(metric, temporalInfo != null);
+                if (value != null) {
+                  resource.setProperty(propertyId, value);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // Normalize percent values: Copied over from Ganglia Metric
+  private static Number[][] getGangliaLikeDatapoints(TimelineMetric metric) {
+    Number[][] datapointsArray = new Number[metric.getMetricValues().size()][2];
+    int cnt = 0;
+
+    for (Map.Entry<Long, Double> metricEntry : metric.getMetricValues().entrySet()) {
+      Double value = metricEntry.getValue();
+      Long time = metricEntry.getKey();
+      if (time > 9999999999l) {
+        time = time / 1000;
+      }
+
+      if (PERCENTAGE_METRIC.contains(metric.getMetricName())) {
+        value = new Double(decimalFormat.format(value / 100));
+      }
+
+      datapointsArray[cnt][0] = value;
+      datapointsArray[cnt][1] = time;
+      cnt++;
+    }
+
+    return datapointsArray;
+  }
+
+  /**
+   * Get value from the given metric.
+   *
+   * @param metric      the metric
+   * @param isTemporal  indicates whether or not this a temporal metric
+   *
+   * @return a range of temporal data or a point in time value if not temporal
+   */
+  private static Object getValue(TimelineMetric metric, boolean isTemporal) {
+    Number[][] dataPoints = getGangliaLikeDatapoints(metric);
+
+    int length = dataPoints.length;
+    if (isTemporal) {
+      return length > 0 ? dataPoints : null;
+    } else {
+      // return the value of the last data point
+      return length > 0 ? dataPoints[length - 1][0] : 0;
+    }
+  }
+
+  protected static URIBuilder getUriBuilder(String hostname, int port) {
+    URIBuilder uriBuilder = new URIBuilder();
+    uriBuilder.setScheme("http");
+    uriBuilder.setHost(hostname);
+    uriBuilder.setPort(port);
+    uriBuilder.setPath("/ws/v1/timeline/metrics");
+    return uriBuilder;
+  }
+
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources,
+              Request request, Predicate predicate) throws SystemException {
+
+    Set<String> ids = getRequestPropertyIds(request, predicate);
+    if (ids.isEmpty()) {
+      return resources;
+    }
+
+    Map<String, Map<TemporalInfo, MetricsRequest>> requestMap =
+      getMetricsRequests(resources, request, ids);
+
+    // For each cluster
+    for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) {
+      // For each request
+      for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) {
+        metricsRequest.populateResources();
+      }
+    }
+
+    return resources;
+  }
+
+  private Map<String, Map<TemporalInfo, MetricsRequest>> getMetricsRequests(
+              Set<Resource> resources, Request request, Set<String> ids) throws SystemException {
+
+    Map<String, Map<TemporalInfo, MetricsRequest>> requestMap =
+      new HashMap<String, Map<TemporalInfo, MetricsRequest>>();
+
+    String collectorHostName = null;
+    String collectorPort = null;
+
+    for (Resource resource : resources) {
+      String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+      Map<TemporalInfo, MetricsRequest> requests = requestMap.get(clusterName);
+      if (requests == null) {
+        requests = new HashMap<TemporalInfo, MetricsRequest>();
+        requestMap.put(clusterName, requests);
+      }
+
+      if (collectorHostName == null) {
+        collectorHostName = hostProvider.getCollectorHostName(clusterName, TIMELINE_METRICS);
+      }
+
+      if (collectorPort == null) {
+        collectorPort = hostProvider.getCollectorPortName(clusterName, TIMELINE_METRICS);
+      }
+
+      for (String id : ids) {
+        Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+        String componentName = getOverridenComponentName(resource);
+
+        Map<String, PropertyInfo> componentMetricMap = getComponentMetrics().get(componentName);
+
+        // Not all components have metrics
+        if (componentMetricMap != null &&
+            !componentMetricMap.containsKey(id)) {
+          updateComponentMetricMap(componentMetricMap, id);
+        }
+
+        getPropertyInfoMap(componentName, id, propertyInfoMap);
+
+        for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+          String propertyId = entry.getKey();
+          PropertyInfo propertyInfo = entry.getValue();
+
+          TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+          if ((temporalInfo == null && propertyInfo.isPointInTime()) ||
+            (temporalInfo != null && propertyInfo.isTemporal())) {
+
+            MetricsRequest metricsRequest = requests.get(temporalInfo);
+            if (metricsRequest == null) {
+              metricsRequest = new MetricsRequest(temporalInfo,
+                getUriBuilder(collectorHostName,
+                  collectorPort != null ? Integer.parseInt(collectorPort) : 8188));
+              requests.put(temporalInfo, metricsRequest);
+            }
+            metricsRequest.putResource(getHostName(resource), resource);
+            metricsRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+          }
+        }
+      }
+    }
+
+    return requestMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
new file mode 100644
index 0000000..0b3be3c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+import java.util.Set;
+
+public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
+
+  public AMSReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+                                 StreamProvider streamProvider,
+                                 ComponentSSLConfiguration configuration,
+                                 MetricHostProvider hostProvider,
+                                 String clusterNamePropertyId) {
+
+    super(componentPropertyInfoMap, streamProvider, configuration,
+      hostProvider, clusterNamePropertyId);
+  }
+
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources,
+               Request request, Predicate predicate) throws SystemException {
+    return resources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
index 5c01500..f8131a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
@@ -112,7 +112,7 @@ public class PropertyHelper {
     return propertyIds;
   }
 
-  public static Map<String, Map<String, PropertyInfo>> getGangliaPropertyIds(Resource.Type resourceType) {
+  public static Map<String, Map<String, PropertyInfo>> getMetricPropertyIds(Resource.Type resourceType) {
     return GANGLIA_PROPERTY_IDS.get(resourceType.getInternalType());
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
index 552ccee..be5c6d8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
@@ -132,6 +132,7 @@ public interface Service {
     PIG,
     FLUME,
     YARN,
-    MAPREDUCE2
+    MAPREDUCE2,
+    AMS
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..843aecd
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private long timestamp;
+  private long startTime;
+  private String type;
+  private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  @XmlElement(name = "metricname")
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  @XmlElement(name = "appid")
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  @XmlElement(name = "instanceid")
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  @XmlElement(name = "hostname")
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  @XmlElement(name = "timestamp")
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @XmlElement(name = "starttime")
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @XmlElement(name = "type")
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @XmlElement(name = "metrics")
+  public Map<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineMetric metric = (TimelineMetric) o;
+
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+    if (timestamp != metric.timestamp) return false;
+    if (startTime != metric.startTime) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + appId.hashCode();
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(TimelineMetric other) {
+    if (timestamp > other.timestamp) {
+      return -1;
+    } else if (timestamp < other.timestamp) {
+      return 1;
+    } else {
+      return metricName.compareTo(other.metricName);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", hostName='" + hostName + '\'' +
+      ", timestamp=" + timestamp +
+      ", startTime=" + startTime +
+      ", type='" + type + '\'' +
+      ", metricValues=" + metricValues +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..0448fdb
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetrics {
+
+  private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+  public TimelineMetrics() {}
+
+  @XmlElement(name = "metrics")
+  public List<TimelineMetric> getMetrics() {
+    return allMetrics;
+  }
+
+  public void setMetrics(List<TimelineMetric> allMetrics) {
+    this.allMetrics = allMetrics;
+  }
+
+  private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+                                         TimelineMetric metric2) {
+
+    boolean isEqual = true;
+
+    if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+      return false;
+    }
+
+    if (metric1.getHostName() != null) {
+      isEqual = metric1.getHostName().equals(metric2.getHostName());
+    }
+
+    if (metric1.getAppId() != null) {
+      isEqual = metric1.getAppId().equals(metric2.getAppId());
+    }
+
+    return isEqual;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineMetrics{" +
+      "allMetrics=" + allMetrics +
+      '}';
+  }
+
+  /**
+   * Merge with existing TimelineMetric if everything except startTime is
+   * the same.
+   * @param metric {@link TimelineMetric}
+   */
+  public void addOrMergeTimelineMetric(TimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (timelineMetric.equalsExceptTime(metric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.addMetricValues(metric.getMetricValues());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/ganglia_properties.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/ganglia_properties.json b/ambari-server/src/main/resources/ganglia_properties.json
index e64309a..2401546 100644
--- a/ambari-server/src/main/resources/ganglia_properties.json
+++ b/ambari-server/src/main/resources/ganglia_properties.json
@@ -336,7 +336,7 @@
         "temporal":true
       },
       "metrics/rpc/RpcQueueTime_avg_time":{
-        "metric":"rpc.metrics.RpcQueueTime_avg_time",
+        "metric":"rpc.rpc.RpcQueueTimeAvgTime",
         "pointInTime":true,
         "temporal":true
       },
@@ -2010,7 +2010,7 @@
         "temporal":true
       },
       "metrics/rpc/RpcQueueTime_avg_time":{
-        "metric":"rpc.rpc.RpcQueueTime_avg_time",
+        "metric":"rpc.rpc.RpcQueueTimeAvgTime",
         "pointInTime":false,
         "temporal":true
       },

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 0cfe41a..3152a1d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -67,6 +67,7 @@ jtnode_host = default("/clusterHostInfo/jtnode_host", [])
 namenode_host = default("/clusterHostInfo/namenode_host", [])
 zk_hosts = default("/clusterHostInfo/zookeeper_hosts", [])
 ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", [])
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
 
 has_namenode = not len(namenode_host) == 0
 has_resourcemanager = not len(rm_host) == 0
@@ -77,6 +78,7 @@ has_hive_server_host = not len(hive_server_host)  == 0
 has_hbase_masters = not len(hbase_master_hosts) == 0
 has_zk_host = not len(zk_hosts) == 0
 has_ganglia_server = not len(ganglia_server_hosts) == 0
+has_metric_collector = not len(ams_collector_hosts) == 0
 
 is_namenode_master = hostname in namenode_host
 is_jtnode_master = hostname in jtnode_host
@@ -86,6 +88,9 @@ is_hbase_master = hostname in hbase_master_hosts
 is_slave = hostname in slave_hosts
 if has_ganglia_server:
   ganglia_server_host = ganglia_server_hosts[0]
+if has_metric_collector:
+  metric_collector_host = ams_collector_hosts[0]
+
 #hadoop params
 
 if has_namenode:

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index c4759f4..31ca5c3 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -63,3 +63,21 @@ supervisor.sink.ganglia.servers={{ganglia_server_host}}:8650
 resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 
 {% endif %}
+
+{% if has_metric_collector %}
+
+*.period=60
+*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+*.sink.timeline.period=10
+datanode.sink.timeline.collector={{metric_collector_host}}:8188
+namenode.sink.timeline.collector={{metric_collector_host}}:8188
+resourcemanager.sink.timeline.collector={{metric_collector_host}}:8188
+nodemanager.sink.timeline.collector={{metric_collector_host}}:8188
+historyserver.sink.timeline.collector={{metric_collector_host}}:8188
+journalnode.sink.timeline.collector={{metric_collector_host}}:8188
+nimbus.sink.timeline.collector={{metric_collector_host}}:8188
+supervisor.sink.timeline.collector={{metric_collector_host}}:8188
+maptask.sink.timeline.collector={{metric_collector_host}}:8188
+reducetask.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
index 6b625d5..a5b8549 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
@@ -84,6 +84,11 @@ regionserver_jaas_config_file = format("{hbase_conf_dir}/hbase_regionserver_jaas
 ganglia_server_hosts = default('/clusterHostInfo/ganglia_server_host', []) # is not passed when ganglia is not present
 ganglia_server_host = '' if len(ganglia_server_hosts) == 0 else ganglia_server_hosts[0]
 
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+if has_metric_collector:
+  metric_collector_host = ams_collector_hosts[0]
+
 # if hbase is selected the hbase_rs_hosts, should not be empty, but still default just in case
 if 'slave_hosts' in config['clusterHostInfo']:
   rs_hosts = default('/clusterHostInfo/hbase_rs_hosts', '/clusterHostInfo/slave_hosts') #if hbase_rs_hosts not given it is assumed that region servers on same nodes as slaves

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
index 0513104..fc033a7 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
@@ -79,3 +79,27 @@ hbase.sink.ganglia.period=10
 .sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
 
 hbase.sink.ganglia.servers={{ganglia_server_host}}:8663
+
+{% if has_metric_collector %}
+
+# HBase-specific configuration to reset long-running stats (e.g. compactions)
+# If this variable is left out, then the default is no expiration.
+hbase.extendedperiod = 3600
+
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.period=10
+hbase.collector={{metric_collector_host}}:8188
+
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.period=10
+jvm.collector={{metric_collector_host}}:8188
+
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.period=10
+rpc.collector={{metric_collector_host}}:8188
+
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.period=10
+hbase.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
index 55e8461..97e76d8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
@@ -78,3 +78,27 @@ hbase.sink.ganglia.period=10
 .sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
 
 hbase.sink.ganglia.servers={{ganglia_server_host}}:8656
+
+{% if has_metric_collector %}
+
+# HBase-specific configuration to reset long-running stats (e.g. compactions)
+# If this variable is left out, then the default is no expiration.
+hbase.extendedperiod = 3600
+
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.period=10
+hbase.collector={{metric_collector_host}}:8188
+
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.period=10
+jvm.collector={{metric_collector_host}}:8188
+
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.period=10
+rpc.collector={{metric_collector_host}}:8188
+
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.period=10
+hbase.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
new file mode 100644
index 0000000..fda1df0
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>ams_user</name>
+    <value>root</value>
+    <property-type>USER</property-type>
+    <description>AMS User Name.</description>
+  </property>
+
+  <property>
+    <name>content</name>
+    <value>
+      # Set environment variables here.
+
+      # The java implementation to use. Java 1.6 required.
+      export JAVA_HOME={{java64_home}}
+
+      #TODO
+    </value>
+  </property>
+
+</configuration>