You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:22 UTC
[06/30] ambari git commit: AMBARI-5707. Replace Ganglia with high
performant and pluggable Metrics System. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
new file mode 100644
index 0000000..63533c6
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaPropertyProvider.java
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics.ganglia;
+
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
+import org.apache.ambari.server.controller.spi.*;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.*;
+
+/**
+ * Abstract property provider implementation for a Ganglia source.
+ */
+public abstract class GangliaPropertyProvider extends MetricsPropertyProvider {
+
+ /**
+ * Map of Ganglia cluster names keyed by component type.
+ */
+ static final Map<String, List<String>> GANGLIA_CLUSTER_NAME_MAP = new HashMap<String, List<String>>();
+
+
+ static {
+ GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE", Collections.singletonList("HDPNameNode"));
+ GANGLIA_CLUSTER_NAME_MAP.put("DATANODE", Arrays.asList("HDPDataNode", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER", Collections.singletonList("HDPJobTracker"));
+ GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER", Arrays.asList("HDPTaskTracker", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("RESOURCEMANAGER", Collections.singletonList("HDPResourceManager"));
+ GANGLIA_CLUSTER_NAME_MAP.put("NODEMANAGER", Arrays.asList("HDPNodeManager", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("HISTORYSERVER", Collections.singletonList("HDPHistoryServer"));
+ GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER", Collections.singletonList("HDPHBaseMaster"));
+ GANGLIA_CLUSTER_NAME_MAP.put("HBASE_REGIONSERVER", Arrays.asList("HDPHBaseRegionServer", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("FLUME_HANDLER", Arrays.asList("HDPFlumeServer", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("JOURNALNODE", Arrays.asList("HDPJournalNode", "HDPSlaves"));
+ GANGLIA_CLUSTER_NAME_MAP.put("NIMBUS", Collections.singletonList("HDPNimbus"));
+ GANGLIA_CLUSTER_NAME_MAP.put("SUPERVISOR", Collections.singletonList("HDPSupervisor"));
+ }
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(GangliaPropertyProvider.class);
+
+ // ----- Constructors ------------------------------------------------------
+
+ public GangliaPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId,
+ String hostNamePropertyId,
+ String componentNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider,configuration,
+ hostProvider, clusterNamePropertyId, hostNamePropertyId,
+ componentNamePropertyId);
+ }
+
+
+ // ----- PropertyProvider --------------------------------------------------
+
+ @Override
+ public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
+ throws SystemException {
+
+ Set<String> ids = getRequestPropertyIds(request, predicate);
+ if (ids.isEmpty()) {
+ return resources;
+ }
+
+ Map<String, Map<TemporalInfo, RRDRequest>> requestMap = getRRDRequests(resources, request, ids);
+
+ // For each cluster...
+ for (Map.Entry<String, Map<TemporalInfo, RRDRequest>> clusterEntry : requestMap.entrySet()) {
+ // For each request ...
+ for (RRDRequest rrdRequest : clusterEntry.getValue().values() ) {
+ //todo: property provider can reduce set of resources
+ rrdRequest.populateResources();
+ }
+ }
+ return resources;
+ }
+
+
+ // ----- GangliaPropertyProvider -------------------------------------------
+
+
+ /**
+ * Get the ganglia cluster name for the given resource.
+ *
+ *
+ * @param resource the resource
+ *
+ * @return the ganglia cluster name
+ */
+ protected abstract Set<String> getGangliaClusterNames(Resource resource, String clusterName);
+
+
+ /**
+ * Get the component name property id.
+ *
+ * @return the component name property id
+ */
+ protected String getComponentNamePropertyId() {
+ return componentNamePropertyId;
+ }
+
+ /**
+ * Get the host name property id.
+ *
+ * @return the host name property id
+ */
+ protected String getHostNamePropertyId() {
+ return hostNamePropertyId;
+ }
+
+ /**
+ * Get the stream provider.
+ *
+ * @return the stream provider
+ */
+ public StreamProvider getStreamProvider() {
+ return streamProvider;
+ }
+
+
+ // ----- helper methods ----------------------------------------------------
+
+ /**
+ * Get the request objects containing all the information required to
+ * make single requests to the Ganglia rrd script.
+ * Requests are created per cluster name / temporal information but
+ * can span multiple resources and metrics.
+ *
+ * @param resources the resources being populated
+ * @param request the request
+ * @param ids the relevant property ids
+ *
+ * @return a map of maps of rrd requests keyed by cluster name / temporal info
+ */
+ private Map<String, Map<TemporalInfo, RRDRequest>> getRRDRequests(Set<Resource> resources,
+ Request request,
+ Set<String> ids) {
+
+ Map<String, Map<TemporalInfo, RRDRequest>> requestMap =
+ new HashMap<String, Map<TemporalInfo, RRDRequest>>();
+
+ for (Resource resource : resources) {
+ String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+ Map<TemporalInfo, RRDRequest> requests = requestMap.get(clusterName);
+ if (requests == null) {
+ requests = new HashMap<TemporalInfo, RRDRequest>();
+ requestMap.put(clusterName, requests);
+ }
+
+ Set<String> gangliaClusterNames = getGangliaClusterNames(resource, clusterName);
+
+ for (String gangliaClusterName : gangliaClusterNames) {
+ ResourceKey key =
+ new ResourceKey(getHostName(resource), gangliaClusterName);
+
+ for (String id : ids) {
+ Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+ Map<String, PropertyInfo> componentMetricMap =
+ getComponentMetrics().get(getComponentName(resource));
+
+ // Not all components have metrics
+ if (componentMetricMap != null &&
+ !componentMetricMap.containsKey(id)) {
+ updateComponentMetricMap(componentMetricMap, id);
+ }
+
+ getPropertyInfoMap(getComponentName(resource), id, propertyInfoMap);
+
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+ String propertyId = entry.getKey();
+ PropertyInfo propertyInfo = entry.getValue();
+
+ TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+ if ((temporalInfo == null && propertyInfo.isPointInTime()) || (temporalInfo != null && propertyInfo.isTemporal())) {
+ RRDRequest rrdRequest = requests.get(temporalInfo);
+ if (rrdRequest == null) {
+ rrdRequest = new RRDRequest(clusterName, temporalInfo);
+ requests.put(temporalInfo, rrdRequest);
+ }
+ rrdRequest.putResource(key, resource);
+ rrdRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+ }
+ }
+ }
+ }
+ }
+ return requestMap;
+ }
+
+ /**
+ * Get the spec to locate the Ganglia stream from the given
+ * request info.
+ *
+ * @param clusterName the cluster name
+ * @param clusterSet the set of ganglia cluster names
+ * @param hostSet the set of host names
+ * @param metricSet the set of metric names
+ * @param temporalInfo the temporal information
+ *
+ * @return the spec, like http://example.com/path?param1=val1¶mn=valn
+ *
+ * @throws org.apache.ambari.server.controller.spi.SystemException if unable to get the Ganglia Collector host name
+ */
+ private String getSpec(String clusterName,
+ Set<String> clusterSet,
+ Set<String> hostSet,
+ Set<String> metricSet,
+ TemporalInfo temporalInfo) throws SystemException {
+
+ String clusters = getSetString(clusterSet, -1);
+ String hosts = getSetString(hostSet, -1);
+ String metrics = getSetString(metricSet, -1);
+
+ URIBuilder uriBuilder = new URIBuilder();
+
+ if (configuration.isGangliaSSL()) {
+ uriBuilder.setScheme("https");
+ } else {
+ uriBuilder.setScheme("http");
+ }
+
+ uriBuilder.setHost(hostProvider.getCollectorHostName(clusterName, GANGLIA));
+
+ uriBuilder.setPath("/cgi-bin/rrd.py");
+
+ uriBuilder.setParameter("c", clusters);
+
+ if (hosts.length() > 0) {
+ uriBuilder.setParameter("h", hosts);
+ }
+
+ if (metrics.length() > 0) {
+ uriBuilder.setParameter("m", metrics);
+ } else {
+ // get all metrics
+ uriBuilder.setParameter("m", ".*");
+ }
+
+ if (temporalInfo != null) {
+ long startTime = temporalInfo.getStartTime();
+ if (startTime != -1) {
+ uriBuilder.setParameter("s", String.valueOf(startTime));
+ }
+
+ long endTime = temporalInfo.getEndTime();
+ if (endTime != -1) {
+ uriBuilder.setParameter("e", String.valueOf(endTime));
+ }
+
+ long step = temporalInfo.getStep();
+ if (step != -1) {
+ uriBuilder.setParameter("r", String.valueOf(step));
+ }
+ } else {
+ uriBuilder.setParameter("e", "now");
+ uriBuilder.setParameter("pt", "true");
+ }
+
+ return uriBuilder.toString();
+ }
+
+
+ /**
+ * Get value from the given metric.
+ *
+ * @param metric the metric
+ * @param isTemporal indicates whether or not this a temporal metric
+ *
+ * @return a range of temporal data or a point in time value if not temporal
+ */
+ private static Object getValue(GangliaMetric metric, boolean isTemporal) {
+ Number[][] dataPoints = metric.getDatapoints();
+
+ int length = dataPoints.length;
+ if (isTemporal) {
+ return length > 0 ? dataPoints : null;
+ } else {
+ // return the value of the last data point
+ return length > 0 ? dataPoints[length - 1][0] : 0;
+ }
+ }
+
+ // ----- inner classes -----------------------------------------------------
+
+
+ // ----- RRDRequest ----------------------------------------------------
+
+ /**
+ * The information required to make a single RRD request.
+ */
+ private class RRDRequest {
+ private static final int POPULATION_TIME_UPPER_LIMIT = 5;
+ private final String clusterName;
+ private final TemporalInfo temporalInfo;
+ private final Map<ResourceKey, Set<Resource>> resources = new HashMap<ResourceKey, Set<Resource>>();
+ private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
+ private final Set<String> clusterSet = new HashSet<String>();
+ private final Set<String> hostSet = new HashSet<String>();
+
+
+ private RRDRequest(String clusterName, TemporalInfo temporalInfo) {
+ this.clusterName = clusterName;
+ this.temporalInfo = temporalInfo;
+ }
+
+ public void putResource(ResourceKey key, Resource resource) {
+ clusterSet.add(key.getClusterName());
+ hostSet.add(key.getHostName());
+ Set<Resource> resourceSet = resources.get(key);
+ if (resourceSet == null) {
+ resourceSet = new HashSet<Resource>();
+ resources.put(key, resourceSet);
+ }
+ resourceSet.add(resource);
+ }
+
+ public void putPropertyId(String metric, String id) {
+ Set<String> propertyIds = metrics.get(metric);
+
+ if (propertyIds == null) {
+ propertyIds = new HashSet<String>();
+ metrics.put(metric, propertyIds);
+ }
+ propertyIds.add(id);
+ }
+
+ /**
+ * Populate the associated resources by making the rrd request.
+ *
+ * @return a collection of populated resources
+ *
+ * @throws org.apache.ambari.server.controller.spi.SystemException if unable to populate the resources
+ */
+ public Collection<Resource> populateResources() throws SystemException {
+
+ //Get full url with parameters
+ String specWithParams = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
+
+ //URL
+ String spec = null;
+ //Parameters
+ String params = null;
+
+ String[] tokens = questionMarkPattern.split(specWithParams, 2);
+
+ try {
+ spec = tokens[0];
+ params = tokens[1];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LOG.info(e.toString());
+ }
+
+
+ BufferedReader reader = null;
+ try {
+
+ //Check if host is live
+ if (!hostProvider.isCollectorHostLive(clusterName, GANGLIA)) {
+ LOG.info("Ganglia host is not live");
+ return Collections.emptySet();
+ }
+
+ //Check if Ganglia server component is live
+ if (!hostProvider.isCollectorComponentLive(clusterName, GANGLIA)) {
+ LOG.info("Ganglia server component is not live");
+ return Collections.emptySet();
+ }
+
+ reader = new BufferedReader(new InputStreamReader(
+ getStreamProvider().readFrom(spec, "POST", params)));
+
+ String feedStart = reader.readLine();
+ if (feedStart == null || feedStart.isEmpty()) {
+ LOG.info("Empty feed while getting ganglia metrics for spec => "+
+ spec);
+ return Collections.emptySet();
+ }
+ int startTime = convertToNumber(feedStart).intValue();
+
+ String dsName = reader.readLine();
+ if (dsName == null || dsName.isEmpty()) {
+ LOG.info("Feed without body while reading ganglia metrics for spec " +
+ "=> " + spec);
+ return Collections.emptySet();
+ }
+
+ while(!"[~EOF]".equals(dsName)) {
+ GangliaMetric metric = new GangliaMetric();
+ List<GangliaMetric.TemporalMetric> listTemporalMetrics =
+ new ArrayList<GangliaMetric.TemporalMetric>();
+
+ metric.setDs_name(dsName);
+ metric.setCluster_name(reader.readLine());
+ metric.setHost_name(reader.readLine());
+ metric.setMetric_name(reader.readLine());
+
+ String timeStr = reader.readLine();
+ String stepStr = reader.readLine();
+ if (timeStr == null || timeStr.isEmpty() || stepStr == null
+ || stepStr.isEmpty()) {
+ LOG.info("Unexpected end of stream reached while getting ganglia " +
+ "metrics for spec => " + spec);
+ return Collections.emptySet();
+ }
+ int time = convertToNumber(timeStr).intValue();
+ int step = convertToNumber(stepStr).intValue();
+
+ String val = reader.readLine();
+ String lastVal = null;
+
+ while(val!=null && !"[~EOM]".equals(val)) {
+ if (val.startsWith("[~r]")) {
+ Integer repeat = Integer.valueOf(val.substring(4)) - 1;
+ for (int i = 0; i < repeat; ++i) {
+ if (! "[~n]".equals(lastVal)) {
+ GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(lastVal, time);
+ if (tm.isValid()) listTemporalMetrics.add(tm);
+ }
+ time += step;
+ }
+ } else {
+ if (! "[~n]".equals(val)) {
+ GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(val, time);
+ if (tm.isValid()) listTemporalMetrics.add(tm);
+ }
+ time += step;
+ }
+ lastVal = val;
+ val = reader.readLine();
+ }
+
+ metric.setDatapointsFromList(listTemporalMetrics);
+
+ ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
+ Set<Resource> resourceSet = resources.get(key);
+ if (resourceSet != null) {
+ for (Resource resource : resourceSet) {
+ populateResource(resource, metric);
+ }
+ }
+
+ dsName = reader.readLine();
+ if (dsName == null || dsName.isEmpty()) {
+ LOG.info("Unexpected end of stream reached while getting ganglia " +
+ "metrics for spec => " + spec);
+ return Collections.emptySet();
+ }
+ }
+ String feedEnd = reader.readLine();
+ if (feedEnd == null || feedEnd.isEmpty()) {
+ LOG.info("Error reading end of feed while getting ganglia metrics " +
+ "for spec => " + spec);
+ } else {
+
+ int endTime = convertToNumber(feedEnd).intValue();
+ int totalTime = endTime - startTime;
+ if (LOG.isInfoEnabled() && totalTime > POPULATION_TIME_UPPER_LIMIT) {
+ LOG.info("Ganglia resource population time: " + totalTime);
+ }
+ }
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Caught exception getting Ganglia metrics : spec=" + spec);
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unable to close http input steam : spec=" + spec, e);
+ }
+ }
+ }
+ }
+ //todo: filter out resources and return keepers
+ return Collections.emptySet();
+ }
+
+
+ /**
+ * Populate the given resource with the given Ganglia metric.
+ *
+ * @param resource the resource
+ * @param gangliaMetric the Ganglia metrics
+ */
+ private void populateResource(Resource resource, GangliaMetric gangliaMetric) {
+ String metric_name = gangliaMetric.getMetric_name();
+
+ Set<String> propertyIdSet = metrics.get(metric_name);
+ List<String> parameterList = new LinkedList<String>();
+
+ if (propertyIdSet == null) {
+ for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {
+
+ String key = entry.getKey();
+
+ Pattern pattern = Pattern.compile(key);
+ Matcher matcher = pattern.matcher(metric_name);
+
+ if (matcher.matches()) {
+ propertyIdSet = entry.getValue();
+ // get parameters
+ for (int i = 0; i < matcher.groupCount(); ++i) {
+ parameterList.add(matcher.group(i + 1));
+ }
+ break;
+ }
+ }
+ }
+ if (propertyIdSet != null) {
+ Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getComponentName(resource));
+ if (metricsMap != null) {
+ for (String propertyId : propertyIdSet) {
+ if (propertyId != null) {
+ if (metricsMap.containsKey(propertyId)){
+ if (containsArguments(propertyId)) {
+ int i = 1;
+ for (String param : parameterList) {
+ propertyId = substituteArgument(propertyId, "$" + i, param);
+ ++i;
+ }
+ }
+ Object value = getValue(gangliaMetric, temporalInfo != null);
+ if (value != null) {
+ resource.setProperty(propertyId, value);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private Number convertToNumber(String s) {
+ return s.contains(".") ? Double.parseDouble(s) : Long.parseLong(s);
+ }
+ }
+
+
+ // ----- ResourceKey ---------------------------------------------------
+
+ /**
+ * Key used to associate information from a Ganglia metric to a resource.
+ */
+ private static class ResourceKey {
+ private final String hostName;
+ private final String gangliaClusterName;
+
+ private ResourceKey(String hostName, String gangliaClusterName) {
+ this.hostName = hostName;
+ this.gangliaClusterName = gangliaClusterName;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public String getClusterName() {
+ return gangliaClusterName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ResourceKey that = (ResourceKey) o;
+
+ return
+ !(gangliaClusterName != null ? !gangliaClusterName.equals(that.gangliaClusterName) : that.gangliaClusterName != null) &&
+ !(hostName != null ? !hostName.equals(that.hostName) : that.hostName != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = hostName != null ? hostName.hashCode() : 0;
+ result = 31 * result + (gangliaClusterName != null ? gangliaClusterName.hashCode() : 0);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
new file mode 100644
index 0000000..09ea31c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/ganglia/GangliaReportPropertyProvider.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics.ganglia;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.GANGLIA;
+
+/**
+ * Property provider implementation for a Ganglia source. This provider is specialized
+ * to pull metrics from existing Ganglia reports.
+ */
+public class GangliaReportPropertyProvider extends MetricsReportPropertyProvider {
+ // ----- Constants --------------------------------------------------------
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(GangliaReportPropertyProvider.class);
+
+
+ // ----- Constructors ------------------------------------------------------
+
+ public GangliaReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId) {
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ hostProvider, clusterNamePropertyId);
+ }
+
+
+ // ----- PropertyProvider --------------------------------------------------
+
+ @Override
+ public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
+ throws SystemException {
+
+ Set<Resource> keepers = new HashSet<Resource>();
+ for (Resource resource : resources) {
+ if (populateResource(resource, request, predicate)) {
+ keepers.add(resource);
+ }
+ }
+ return keepers;
+ }
+
+
+ // ----- helper methods ----------------------------------------------------
+
+ /**
+ * Populate a resource by obtaining the requested Ganglia RESOURCE_METRICS.
+ *
+ * @param resource the resource to be populated
+ * @param request the request
+ * @param predicate the predicate
+ *
+ * @return true if the resource was successfully populated with the requested properties
+ *
+ * @throws SystemException if unable to populate the resource
+ */
+ private boolean populateResource(Resource resource, Request request, Predicate predicate)
+ throws SystemException {
+
+ Set<String> propertyIds = getPropertyIds();
+
+ if (propertyIds.isEmpty()) {
+ return true;
+ }
+ String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+
+ if (hostProvider.getCollectorHostName(clusterName, GANGLIA) == null) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Attempting to get metrics but the Ganglia server is unknown. Resource=" + resource +
+ " : Cluster=" + clusterName);
+ }
+ return true;
+ }
+
+ setProperties(resource, clusterName, request, getRequestPropertyIds(request, predicate));
+
+ return true;
+ }
+
+ private boolean setProperties(Resource resource, String clusterName, Request request, Set<String> ids)
+ throws SystemException {
+
+ Map<String, Map<String, String>> propertyIdMaps = getPropertyIdMaps(request, ids);
+
+ for (Map.Entry<String, Map<String, String>> entry : propertyIdMaps.entrySet()) {
+ Map<String, String> map = entry.getValue();
+ String report = entry.getKey();
+
+ String spec = getSpec(clusterName, report);
+
+ try {
+ List<GangliaMetric> gangliaMetrics = new ObjectMapper().readValue(streamProvider.readFrom(spec),
+ new TypeReference<List<GangliaMetric>>() {});
+
+ if (gangliaMetrics != null) {
+ for (GangliaMetric gangliaMetric : gangliaMetrics) {
+
+ String propertyId = map.get(gangliaMetric.getMetric_name());
+ if (propertyId != null) {
+ resource.setProperty(propertyId, getValue(gangliaMetric));
+ }
+ }
+ }
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Caught exception getting Ganglia metrics : " + e + " : spec=" + spec);
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private Map<String, Map<String, String>> getPropertyIdMaps(Request request, Set<String> ids) {
+ Map<String, Map<String, String>> propertyMap = new HashMap<String, Map<String, String>>();
+
+ for (String id : ids) {
+ Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap("*", id);
+
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+ String propertyId = entry.getKey();
+ PropertyInfo propertyInfo = entry.getValue();
+
+ TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+ if (temporalInfo != null && propertyInfo.isTemporal()) {
+ String propertyName = propertyInfo.getPropertyId();
+ String report = null;
+ // format : report_name.metric_name
+ int dotIndex = propertyName.lastIndexOf('.');
+ if (dotIndex != -1){
+ report = propertyName.substring(0, dotIndex);
+ propertyName = propertyName.substring(dotIndex + 1);
+ }
+ if (report != null) {
+ Map<String, String> map = propertyMap.get(report);
+ if (map == null) {
+ map = new HashMap<String, String>();
+ propertyMap.put(report, map);
+ }
+ map.put(propertyName, propertyId);
+ }
+ }
+ }
+ }
+ return propertyMap;
+ }
+
+ /**
+ * Get value from the given metric.
+ *
+ * @param metric the metric
+ */
+ private Object getValue(GangliaMetric metric) {
+ return metric.getDatapoints();
+ }
+
+ /**
+ * Get the spec to locate the Ganglia stream from the given
+ * request info.
+ *
+ *
+ * @param clusterName the cluster name
+ * @param report the report
+ *
+ * @return the spec
+ *
+ * @throws SystemException if unable to ge the Ganglia Collector host name
+ */
+ protected String getSpec(String clusterName, String report) throws SystemException {
+
+ StringBuilder sb = new StringBuilder();
+
+ if (configuration.isGangliaSSL()) {
+ sb.append("https://");
+ } else {
+ sb.append("http://");
+ }
+
+ sb.append(hostProvider.getCollectorHostName(clusterName, GANGLIA)).
+ append("/ganglia/graph.php?g=").
+ append(report).
+ append("&json=1");
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
new file mode 100644
index 0000000..944ec5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+
+public class AMSComponentPropertyProvider extends AMSPropertyProvider {
+
+ public AMSComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId,
+ String componentNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+ clusterNamePropertyId, null, componentNamePropertyId);
+ }
+
+ @Override
+ protected String getHostName(Resource resource) {
+ return null;
+ }
+
+ @Override
+ protected String getComponentName(Resource resource) {
+ String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
+
+ if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+ componentName = TIMLINE_APPID_MAP.get(componentName);
+ }
+
+ return componentName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
new file mode 100644
index 0000000..09836e3
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+
+public class AMSHostComponentPropertyProvider extends AMSPropertyProvider {
+
+ public AMSHostComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId,
+ String hostNamePropertyId,
+ String componentNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+ clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId);
+ }
+
+ @Override
+ protected String getHostName(Resource resource) {
+ return (String) resource.getPropertyValue(hostNamePropertyId);
+ }
+
+ @Override
+ protected String getComponentName(Resource resource) {
+ String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
+
+ if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+ componentName = TIMLINE_APPID_MAP.get(componentName);
+ }
+
+ return componentName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
new file mode 100644
index 0000000..ca9d685
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+
+import java.util.Map;
+
+public class AMSHostPropertyProvider extends AMSPropertyProvider {
+
+ public AMSHostPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId,
+ String hostNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
+ clusterNamePropertyId, hostNamePropertyId, null);
+ }
+
+ @Override
+ protected String getHostName(Resource resource) {
+ return (String) resource.getPropertyValue(hostNamePropertyId);
+ }
+
+ @Override
+ protected String getComponentName(Resource resource) {
+ return "HOST";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
new file mode 100644
index 0000000..d75c982
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.utils.URIBuilder;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.ambari.server.controller.metrics.MetricsPropertyProvider.MetricsService.TIMELINE_METRICS;
+import static org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
+ static final Map<String, String> TIMLINE_APPID_MAP = new HashMap<String, String>();
+ private static ObjectMapper mapper;
+ //private final HttpClient httpClient = new HttpClient();
+ private final static ObjectReader timelineObjectReader;
+ private static final DecimalFormat decimalFormat = new DecimalFormat("#.00");
+
+ private static final Set<String> PERCENTAGE_METRIC;
+
+ static {
+ TIMLINE_APPID_MAP.put("HBASE_MASTER", "HBASE");
+ TIMLINE_APPID_MAP.put("HBASE_REGIONSERVER", "HBASE");
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ //noinspection deprecation
+ mapper.getSerializationConfig().setSerializationInclusion(Inclusion.NON_NULL);
+ timelineObjectReader = mapper.reader(TimelineMetrics.class);
+
+ Set<String> temp = new HashSet<String>();
+ temp.add("cpu_wio");
+ temp.add("cpu_idle");
+ temp.add("cpu_nice");
+ temp.add("cpu_aidle");
+ temp.add("cpu_system");
+ temp.add("cpu_user");
+ PERCENTAGE_METRIC = Collections.unmodifiableSet(temp);
+ }
+
+ public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId,
+ String hostNamePropertyId,
+ String componentNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ hostProvider, clusterNamePropertyId, hostNamePropertyId,
+ componentNamePropertyId);
+ }
+
+ protected String getOverridenComponentName(Resource resource) {
+ String componentName = getComponentName(resource);
+ // Hack: To allow host queries to succeed
+ if (componentName.equals("HOST")) {
+ return "*";
+ }
+ return componentName;
+ }
+
+ /**
+ * The information required to make a single call to the Metrics service.
+ */
+ class MetricsRequest {
+ private final TemporalInfo temporalInfo;
+ private final Map<String, Set<Resource>> resources = new HashMap<String, Set<Resource>>();
+ private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
+ private final URIBuilder uriBuilder;
+ private final String dummyHostName = "__SummaryInfo__";
+
+ private MetricsRequest(TemporalInfo temporalInfo, URIBuilder uriBuilder) {
+ this.temporalInfo = temporalInfo;
+ this.uriBuilder = uriBuilder;
+ }
+
+ public void putResource(String hostname, Resource resource) {
+ if (hostname == null) {
+ hostname = dummyHostName;
+ }
+ Set<Resource> resourceSet = resources.get(hostname);
+ if (resourceSet == null) {
+ resourceSet = new HashSet<Resource>();
+ resources.put(hostname, resourceSet);
+ }
+ resourceSet.add(resource);
+ }
+
+ public void putPropertyId(String metric, String id) {
+ Set<String> propertyIds = metrics.get(metric);
+
+ if (propertyIds == null) {
+ propertyIds = new HashSet<String>();
+ metrics.put(metric, propertyIds);
+ }
+ propertyIds.add(id);
+ }
+
+ /**
+ * Populate the associated resources by making a call to the Metrics
+ * service.
+ *
+ * @return a collection of populated resources
+ * @throws SystemException if unable to populate the resources
+ */
+ public Collection<Resource> populateResources() throws SystemException {
+ // No open ended query support.
+ if (temporalInfo == null || temporalInfo.getStartTime() == null ||
+ temporalInfo.getEndTime() == null) {
+ return Collections.emptySet();
+ }
+
+ for (Map.Entry<String, Set<Resource>> resourceEntry : resources.entrySet()) {
+ String hostname = resourceEntry.getKey();
+ Set<Resource> resourceSet = resourceEntry.getValue();
+
+ for (Resource resource : resourceSet) {
+ String metricsParam = getSetString(metrics.keySet(), -1);
+ // Reuse uriBuilder
+ uriBuilder.removeQuery();
+
+ if (metricsParam.length() > 0) {
+ uriBuilder.setParameter("metricNames", metricsParam);
+ }
+
+ if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
+ uriBuilder.setParameter("hostname", hostname);
+ }
+
+ String componentName = getComponentName(resource);
+ if (componentName != null && !componentName.isEmpty()) {
+ if (TIMLINE_APPID_MAP.containsKey(componentName)) {
+ componentName = TIMLINE_APPID_MAP.get(componentName);
+ }
+ uriBuilder.setParameter("appId", componentName);
+ }
+
+ long startTime = temporalInfo.getStartTime();
+ if (startTime != -1) {
+ uriBuilder.setParameter("startTime", String.valueOf(startTime));
+ }
+
+ long endTime = temporalInfo.getEndTime();
+ if (endTime != -1) {
+ uriBuilder.setParameter("endTime", String.valueOf(endTime));
+ }
+
+ BufferedReader reader = null;
+ String spec = uriBuilder.toString();
+ try {
+ LOG.debug("Metrics request url =" + spec);
+ reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
+
+ TimelineMetrics timelineMetrics = timelineObjectReader.readValue(reader);
+ LOG.debug("Timeline metrics response => " + timelineMetrics);
+
+ for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ if (metric.getMetricName() != null && metric.getMetricValues() != null) {
+ populateResource(resource, metric);
+ }
+ }
+
+ } catch (IOException io) {
+ LOG.warn("Error getting timeline metrics.", io);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unable to close http input steam : spec=" + spec, e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return Collections.emptySet();
+ }
+
+ private void populateResource(Resource resource, TimelineMetric metric) {
+ String metric_name = metric.getMetricName();
+ Set<String> propertyIdSet = metrics.get(metric_name);
+ List<String> parameterList = new LinkedList<String>();
+
+ if (propertyIdSet == null) {
+ for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {
+ String key = entry.getKey();
+ Pattern pattern = Pattern.compile(key);
+ Matcher matcher = pattern.matcher(metric_name);
+
+ if (matcher.matches()) {
+ propertyIdSet = entry.getValue();
+ // get parameters
+ for (int i = 0; i < matcher.groupCount(); ++i) {
+ parameterList.add(matcher.group(i + 1));
+ }
+ break;
+ }
+ }
+ }
+ if (propertyIdSet != null) {
+ Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getOverridenComponentName(resource));
+ if (metricsMap != null) {
+ for (String propertyId : propertyIdSet) {
+ if (propertyId != null) {
+ if (metricsMap.containsKey(propertyId)){
+ if (containsArguments(propertyId)) {
+ int i = 1;
+ for (String param : parameterList) {
+ propertyId = substituteArgument(propertyId, "$" + i, param);
+ ++i;
+ }
+ }
+ Object value = getValue(metric, temporalInfo != null);
+ if (value != null) {
+ resource.setProperty(propertyId, value);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Normalize percent values: Copied over from Ganglia Metric
+ private static Number[][] getGangliaLikeDatapoints(TimelineMetric metric) {
+ Number[][] datapointsArray = new Number[metric.getMetricValues().size()][2];
+ int cnt = 0;
+
+ for (Map.Entry<Long, Double> metricEntry : metric.getMetricValues().entrySet()) {
+ Double value = metricEntry.getValue();
+ Long time = metricEntry.getKey();
+ if (time > 9999999999l) {
+ time = time / 1000;
+ }
+
+ if (PERCENTAGE_METRIC.contains(metric.getMetricName())) {
+ value = new Double(decimalFormat.format(value / 100));
+ }
+
+ datapointsArray[cnt][0] = value;
+ datapointsArray[cnt][1] = time;
+ cnt++;
+ }
+
+ return datapointsArray;
+ }
+
+ /**
+ * Get value from the given metric.
+ *
+ * @param metric the metric
+ * @param isTemporal indicates whether or not this a temporal metric
+ *
+ * @return a range of temporal data or a point in time value if not temporal
+ */
+ private static Object getValue(TimelineMetric metric, boolean isTemporal) {
+ Number[][] dataPoints = getGangliaLikeDatapoints(metric);
+
+ int length = dataPoints.length;
+ if (isTemporal) {
+ return length > 0 ? dataPoints : null;
+ } else {
+ // return the value of the last data point
+ return length > 0 ? dataPoints[length - 1][0] : 0;
+ }
+ }
+
+ protected static URIBuilder getUriBuilder(String hostname, int port) {
+ URIBuilder uriBuilder = new URIBuilder();
+ uriBuilder.setScheme("http");
+ uriBuilder.setHost(hostname);
+ uriBuilder.setPort(port);
+ uriBuilder.setPath("/ws/v1/timeline/metrics");
+ return uriBuilder;
+ }
+
+ @Override
+ public Set<Resource> populateResources(Set<Resource> resources,
+ Request request, Predicate predicate) throws SystemException {
+
+ Set<String> ids = getRequestPropertyIds(request, predicate);
+ if (ids.isEmpty()) {
+ return resources;
+ }
+
+ Map<String, Map<TemporalInfo, MetricsRequest>> requestMap =
+ getMetricsRequests(resources, request, ids);
+
+ // For each cluster
+ for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) {
+ // For each request
+ for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) {
+ metricsRequest.populateResources();
+ }
+ }
+
+ return resources;
+ }
+
+ private Map<String, Map<TemporalInfo, MetricsRequest>> getMetricsRequests(
+ Set<Resource> resources, Request request, Set<String> ids) throws SystemException {
+
+ Map<String, Map<TemporalInfo, MetricsRequest>> requestMap =
+ new HashMap<String, Map<TemporalInfo, MetricsRequest>>();
+
+ String collectorHostName = null;
+ String collectorPort = null;
+
+ for (Resource resource : resources) {
+ String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+ Map<TemporalInfo, MetricsRequest> requests = requestMap.get(clusterName);
+ if (requests == null) {
+ requests = new HashMap<TemporalInfo, MetricsRequest>();
+ requestMap.put(clusterName, requests);
+ }
+
+ if (collectorHostName == null) {
+ collectorHostName = hostProvider.getCollectorHostName(clusterName, TIMELINE_METRICS);
+ }
+
+ if (collectorPort == null) {
+ collectorPort = hostProvider.getCollectorPortName(clusterName, TIMELINE_METRICS);
+ }
+
+ for (String id : ids) {
+ Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+ String componentName = getOverridenComponentName(resource);
+
+ Map<String, PropertyInfo> componentMetricMap = getComponentMetrics().get(componentName);
+
+ // Not all components have metrics
+ if (componentMetricMap != null &&
+ !componentMetricMap.containsKey(id)) {
+ updateComponentMetricMap(componentMetricMap, id);
+ }
+
+ getPropertyInfoMap(componentName, id, propertyInfoMap);
+
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+ String propertyId = entry.getKey();
+ PropertyInfo propertyInfo = entry.getValue();
+
+ TemporalInfo temporalInfo = request.getTemporalInfo(id);
+
+ if ((temporalInfo == null && propertyInfo.isPointInTime()) ||
+ (temporalInfo != null && propertyInfo.isTemporal())) {
+
+ MetricsRequest metricsRequest = requests.get(temporalInfo);
+ if (metricsRequest == null) {
+ metricsRequest = new MetricsRequest(temporalInfo,
+ getUriBuilder(collectorHostName,
+ collectorPort != null ? Integer.parseInt(collectorPort) : 8188));
+ requests.put(temporalInfo, metricsRequest);
+ }
+ metricsRequest.putResource(getHostName(resource), resource);
+ metricsRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
+ }
+ }
+ }
+ }
+
+ return requestMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
new file mode 100644
index 0000000..0b3be3c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import java.util.Map;
+import java.util.Set;
+
+public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
+
+ public AMSReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
+ StreamProvider streamProvider,
+ ComponentSSLConfiguration configuration,
+ MetricHostProvider hostProvider,
+ String clusterNamePropertyId) {
+
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ hostProvider, clusterNamePropertyId);
+ }
+
+ @Override
+ public Set<Resource> populateResources(Set<Resource> resources,
+ Request request, Predicate predicate) throws SystemException {
+ return resources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
index 5c01500..f8131a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java
@@ -112,7 +112,7 @@ public class PropertyHelper {
return propertyIds;
}
- public static Map<String, Map<String, PropertyInfo>> getGangliaPropertyIds(Resource.Type resourceType) {
+ public static Map<String, Map<String, PropertyInfo>> getMetricPropertyIds(Resource.Type resourceType) {
return GANGLIA_PROPERTY_IDS.get(resourceType.getInternalType());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
index 552ccee..be5c6d8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java
@@ -132,6 +132,7 @@ public interface Service {
PIG,
FLUME,
YARN,
- MAPREDUCE2
+ MAPREDUCE2,
+ AMS
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..843aecd
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private String hostName;
+ private long timestamp;
+ private long startTime;
+ private String type;
+ private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ @XmlElement(name = "metricname")
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ @XmlElement(name = "appid")
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @XmlElement(name = "instanceid")
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ @XmlElement(name = "hostname")
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ @XmlElement(name = "timestamp")
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @XmlElement(name = "starttime")
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @XmlElement(name = "type")
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @XmlElement(name = "metrics")
+ public Map<Long, Double> getMetricValues() {
+ return metricValues;
+ }
+
+ public void setMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues = metricValues;
+ }
+
+ public void addMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues.putAll(metricValues);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineMetric metric = (TimelineMetric) o;
+
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+ if (timestamp != metric.timestamp) return false;
+ if (startTime != metric.startTime) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + appId.hashCode();
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(TimelineMetric other) {
+ if (timestamp > other.timestamp) {
+ return -1;
+ } else if (timestamp < other.timestamp) {
+ return 1;
+ } else {
+ return metricName.compareTo(other.metricName);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", hostName='" + hostName + '\'' +
+ ", timestamp=" + timestamp +
+ ", startTime=" + startTime +
+ ", type='" + type + '\'' +
+ ", metricValues=" + metricValues +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..0448fdb
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetrics {
+
+ private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+ public TimelineMetrics() {}
+
+ @XmlElement(name = "metrics")
+ public List<TimelineMetric> getMetrics() {
+ return allMetrics;
+ }
+
+ public void setMetrics(List<TimelineMetric> allMetrics) {
+ this.allMetrics = allMetrics;
+ }
+
+ private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+ TimelineMetric metric2) {
+
+ boolean isEqual = true;
+
+ if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+ return false;
+ }
+
+ if (metric1.getHostName() != null) {
+ isEqual = metric1.getHostName().equals(metric2.getHostName());
+ }
+
+ if (metric1.getAppId() != null) {
+ isEqual = metric1.getAppId().equals(metric2.getAppId());
+ }
+
+ return isEqual;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetrics{" +
+ "allMetrics=" + allMetrics +
+ '}';
+ }
+
+ /**
+ * Merge with existing TimelineMetric if everything except startTime is
+ * the same.
+ * @param metric {@link TimelineMetric}
+ */
+ public void addOrMergeTimelineMetric(TimelineMetric metric) {
+ TimelineMetric metricToMerge = null;
+
+ if (!allMetrics.isEmpty()) {
+ for (TimelineMetric timelineMetric : allMetrics) {
+ if (timelineMetric.equalsExceptTime(metric)) {
+ metricToMerge = timelineMetric;
+ break;
+ }
+ }
+ }
+
+ if (metricToMerge != null) {
+ metricToMerge.addMetricValues(metric.getMetricValues());
+ if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+ metricToMerge.setTimestamp(metric.getTimestamp());
+ }
+ if (metricToMerge.getStartTime() > metric.getStartTime()) {
+ metricToMerge.setStartTime(metric.getStartTime());
+ }
+ } else {
+ allMetrics.add(metric);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/ganglia_properties.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/ganglia_properties.json b/ambari-server/src/main/resources/ganglia_properties.json
index e64309a..2401546 100644
--- a/ambari-server/src/main/resources/ganglia_properties.json
+++ b/ambari-server/src/main/resources/ganglia_properties.json
@@ -336,7 +336,7 @@
"temporal":true
},
"metrics/rpc/RpcQueueTime_avg_time":{
- "metric":"rpc.metrics.RpcQueueTime_avg_time",
+ "metric":"rpc.rpc.RpcQueueTimeAvgTime",
"pointInTime":true,
"temporal":true
},
@@ -2010,7 +2010,7 @@
"temporal":true
},
"metrics/rpc/RpcQueueTime_avg_time":{
- "metric":"rpc.rpc.RpcQueueTime_avg_time",
+ "metric":"rpc.rpc.RpcQueueTimeAvgTime",
"pointInTime":false,
"temporal":true
},
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 0cfe41a..3152a1d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -67,6 +67,7 @@ jtnode_host = default("/clusterHostInfo/jtnode_host", [])
namenode_host = default("/clusterHostInfo/namenode_host", [])
zk_hosts = default("/clusterHostInfo/zookeeper_hosts", [])
ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", [])
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
has_namenode = not len(namenode_host) == 0
has_resourcemanager = not len(rm_host) == 0
@@ -77,6 +78,7 @@ has_hive_server_host = not len(hive_server_host) == 0
has_hbase_masters = not len(hbase_master_hosts) == 0
has_zk_host = not len(zk_hosts) == 0
has_ganglia_server = not len(ganglia_server_hosts) == 0
+has_metric_collector = not len(ams_collector_hosts) == 0
is_namenode_master = hostname in namenode_host
is_jtnode_master = hostname in jtnode_host
@@ -86,6 +88,9 @@ is_hbase_master = hostname in hbase_master_hosts
is_slave = hostname in slave_hosts
if has_ganglia_server:
ganglia_server_host = ganglia_server_hosts[0]
+if has_metric_collector:
+ metric_collector_host = ams_collector_hosts[0]
+
#hadoop params
if has_namenode:
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index c4759f4..31ca5c3 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -63,3 +63,21 @@ supervisor.sink.ganglia.servers={{ganglia_server_host}}:8650
resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
{% endif %}
+
+{% if has_metric_collector %}
+
+*.period=60
+*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+*.sink.timeline.period=10
+datanode.sink.timeline.collector={{metric_collector_host}}:8188
+namenode.sink.timeline.collector={{metric_collector_host}}:8188
+resourcemanager.sink.timeline.collector={{metric_collector_host}}:8188
+nodemanager.sink.timeline.collector={{metric_collector_host}}:8188
+historyserver.sink.timeline.collector={{metric_collector_host}}:8188
+journalnode.sink.timeline.collector={{metric_collector_host}}:8188
+nimbus.sink.timeline.collector={{metric_collector_host}}:8188
+supervisor.sink.timeline.collector={{metric_collector_host}}:8188
+maptask.sink.timeline.collector={{metric_collector_host}}:8188
+reducetask.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
index 6b625d5..a5b8549 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/scripts/params.py
@@ -84,6 +84,11 @@ regionserver_jaas_config_file = format("{hbase_conf_dir}/hbase_regionserver_jaas
ganglia_server_hosts = default('/clusterHostInfo/ganglia_server_host', []) # is not passed when ganglia is not present
ganglia_server_host = '' if len(ganglia_server_hosts) == 0 else ganglia_server_hosts[0]
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+if has_metric_collector:
+ metric_collector_host = ams_collector_hosts[0]
+
# if hbase is selected the hbase_rs_hosts, should not be empty, but still default just in case
if 'slave_hosts' in config['clusterHostInfo']:
rs_hosts = default('/clusterHostInfo/hbase_rs_hosts', '/clusterHostInfo/slave_hosts') #if hbase_rs_hosts not given it is assumed that region servers on same nodes as slaves
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
index 0513104..fc033a7 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
@@ -79,3 +79,27 @@ hbase.sink.ganglia.period=10
.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
hbase.sink.ganglia.servers={{ganglia_server_host}}:8663
+
+{% if has_metric_collector %}
+
+# HBase-specific configuration to reset long-running stats (e.g. compactions)
+# If this variable is left out, then the default is no expiration.
+hbase.extendedperiod = 3600
+
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.period=10
+hbase.collector={{metric_collector_host}}:8188
+
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.period=10
+jvm.collector={{metric_collector_host}}:8188
+
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.period=10
+rpc.collector={{metric_collector_host}}:8188
+
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.period=10
+hbase.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
index 55e8461..97e76d8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
@@ -78,3 +78,27 @@ hbase.sink.ganglia.period=10
.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
hbase.sink.ganglia.servers={{ganglia_server_host}}:8656
+
+{% if has_metric_collector %}
+
+# HBase-specific configuration to reset long-running stats (e.g. compactions)
+# If this variable is left out, then the default is no expiration.
+hbase.extendedperiod = 3600
+
+hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.period=10
+hbase.collector={{metric_collector_host}}:8188
+
+jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+jvm.period=10
+jvm.collector={{metric_collector_host}}:8188
+
+rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+rpc.period=10
+rpc.collector={{metric_collector_host}}:8188
+
+hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink
+hbase.sink.timeline.period=10
+hbase.sink.timeline.collector={{metric_collector_host}}:8188
+
+{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
new file mode 100644
index 0000000..fda1df0
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/AMS/configuration/ams-env.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>ams_user</name>
+ <value>root</value>
+ <property-type>USER</property-type>
+ <description>AMS User Name.</description>
+ </property>
+
+ <property>
+ <name>content</name>
+ <value>
+ # Set environment variables here.
+
+ # The java implementation to use. Java 1.6 required.
+ export JAVA_HOME={{java64_home}}
+
+ #TODO
+ </value>
+ </property>
+
+</configuration>