You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:50 UTC
[24/28] ambari git commit: AMBARI-22740 : Fix integration test for
HBase in branch-3.0-ams due to UUID changes. (avijayan)
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
new file mode 100644
index 0000000..43c1f2b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+/**
+ *
+ */
+public class AggregatorUtils {
+
+ private static final Log LOG = LogFactory.getLog(AggregatorUtils.class);
+
+ public static double[] calculateAggregates(Map<Long, Double> metricValues) {
+ double[] values = new double[4];
+ double max = Double.MIN_VALUE;
+ double min = Double.MAX_VALUE;
+ double sum = 0.0;
+ int metricCount = 0;
+
+ if (metricValues != null && !metricValues.isEmpty()) {
+ for (Double value : metricValues.values()) {
+ // TODO: Some nulls in data - need to investigate null values from host
+ if (value != null) {
+ if (value > max) {
+ max = value;
+ }
+ if (value < min) {
+ min = value;
+ }
+ sum += value;
+ }
+ }
+ metricCount = metricValues.values().size();
+ }
+ // BR: WHY ZERO is a good idea?
+ values[0] = sum;
+ values[1] = max != Double.MIN_VALUE ? max : 0.0;
+ values[2] = min != Double.MAX_VALUE ? min : 0.0;
+ values[3] = metricCount;
+
+ return values;
+ }
+
+ public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+ TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean interpolationEnabled) {
+
+ if (timelineMetric.getMetricValues().isEmpty()) {
+ return null;
+ }
+
+ Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+ new HashMap<>();
+
+ Long prevTimestamp = -1l;
+ TimelineClusterMetric prevMetric = null;
+ int count = 0;
+ double sum = 0.0;
+
+ Map<Long,Double> timeSliceValueMap = new HashMap<>();
+ for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+ if (metric.getValue() == null) {
+ continue;
+ }
+
+ Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
+ if (timestamp != -1) {
+ // Metric is within desired time range
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timestamp);
+
+ if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
+ Double newValue = metric.getValue();
+ if (newValue > 0.0) {
+ sum += newValue;
+ count++;
+ }
+ } else {
+ double metricValue = (count > 0) ? (sum / count) : 0.0;
+ timelineClusterMetricMap.put(prevMetric, metricValue);
+ timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
+ sum = metric.getValue();
+ count = sum > 0.0 ? 1 : 0;
+ }
+
+ prevTimestamp = timestamp;
+ prevMetric = clusterMetric;
+ }
+ }
+
+ if (prevTimestamp > 0) {
+ double metricValue = (count > 0) ? (sum / count) : 0.0;
+ timelineClusterMetricMap.put(prevMetric, metricValue);
+ timeSliceValueMap.put(prevTimestamp, metricValue);
+ }
+
+ if (interpolationEnabled) {
+ Map<Long, Double> interpolatedValues = interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, timeSliceValueMap, timelineMetric.getType());
+ for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey());
+ timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, entry.getValue());
+ }
+ }
+
+ return timelineClusterMetricMap;
+ }
+
+ private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, Double> metricValues,
+ List<Long[]> timeSlices,
+ Map<Long, Double> timeSliceValueMap, String type) {
+ Map<Long, Double> resultClusterMetricMap = new HashMap<>();
+
+ if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) {
+ //For Counter Based metrics, ok to do interpolation and extrapolation
+
+ List<Long> requiredTimestamps = new ArrayList<>();
+ for (Long[] timeSlice : timeSlices) {
+ if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+ requiredTimestamps.add(timeSlice[1]);
+ }
+ }
+ Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(metricValues, requiredTimestamps);
+
+ if (interpolatedValuesMap != null) {
+ for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) {
+ Double interpolatedValue = entry.getValue();
+
+ if (interpolatedValue != null) {
+ resultClusterMetricMap.put( entry.getKey(), interpolatedValue);
+ } else {
+ LOG.debug("Cannot compute interpolated value, hence skipping.");
+ }
+ }
+ }
+ } else {
+ //For other metrics, ok to do only interpolation
+
+ Double defaultNextSeenValue = null;
+ if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(metricValues)) {
+ //If no value was found within the start_time based slices, but the metric has value in the server_time range,
+ // use that.
+
+ Map.Entry<Long,Double> firstEntry = metricValues.firstEntry();
+ defaultNextSeenValue = firstEntry.getValue();
+ LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
+ }
+
+ for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
+ Long[] timeSlice = timeSlices.get(sliceNum);
+
+ if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+ LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
+
+ Double lastSeenValue = null;
+ int index = sliceNum - 1;
+ Long[] prevTimeSlice = null;
+ while (lastSeenValue == null && index >= 0) {
+ prevTimeSlice = timeSlices.get(index--);
+ lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
+ }
+
+ Double nextSeenValue = null;
+ index = sliceNum + 1;
+ Long[] nextTimeSlice = null;
+ while (nextSeenValue == null && index < timeSlices.size()) {
+ nextTimeSlice = timeSlices.get(index++);
+ nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
+ }
+
+ if (nextSeenValue == null) {
+ nextSeenValue = defaultNextSeenValue;
+ }
+
+ Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
+ (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
+ (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
+
+ if (interpolatedValue != null) {
+ LOG.debug("Interpolated value : " + interpolatedValue);
+ resultClusterMetricMap.put(timeSlice[1], interpolatedValue);
+ } else {
+ LOG.debug("Cannot compute interpolated value, hence skipping.");
+ }
+ }
+ }
+ }
+ return resultClusterMetricMap;
+ }
+
+ /**
+ * Return end of the time slice into which the metric fits.
+ */
+ public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+ for (Long[] timeSlice : timeSlices) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+ return timeSlice[1];
+ }
+ }
+ return -1l;
+ }
+
+ /**
+ * Return time slices to normalize the timeseries data.
+ */
+ public static List<Long[]> getTimeSlices(long startTime, long endTime, long timeSliceIntervalMillis) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
+ return referenceTime - (referenceTime % aggregatorPeriod);
+ }
+
+ public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
+ long currentTime = System.currentTimeMillis();
+ return currentTime - (currentTime % aggregatorPeriod);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
new file mode 100644
index 0000000..49e2bf6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.List;
+
+/**
+ * Interface to add a custom downsampler.
+ * Each configured downsampler will be executed during an aggregation cycle.
+ */
+public interface CustomDownSampler {
+
+ /**
+ * Gatekeeper to check the configs. If this fails, the downsampling is not done.
+ * @return
+ */
+ public boolean validateConfigs();
+
+ /**
+ * Return the set of statements that needs to be executed for the downsampling.
+ * @param startTime
+ * @param endTime
+ * @param tableName
+ * @return
+ */
+ public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
new file mode 100644
index 0000000..ad47931
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * DownSampler utility class. Responsible for fetching downsampler configs from Metrics config, and determine if
+ * any downsamplers are configured.
+ */
+
+public class DownSamplerUtils {
+
+ public static final String downSamplerConfigPrefix = "timeline.metrics.downsampler.";
+ public static final String downSamplerMetricPatternsConfig = "metric.patterns";
+ public static final String topNDownSampler = "topn";
+ private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class);
+
+
+
+ /**
+ * Get the list of metrics that are requested to be downsampled.
+ * @param configuration
+ * @return List of metric patterns/names that are to be downsampled.
+ */
+ public static List<String> getDownsampleMetricPatterns(Configuration configuration) {
+ Map<String, String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*");
+ List<String> metricPatterns = new ArrayList<>();
+ Set<String> keys = conf.keySet();
+ for (String key : keys) {
+ if (key.endsWith(downSamplerMetricPatternsConfig)) {
+ String patternString = conf.get(key);
+ String[] patterns = StringUtils.split(patternString, ",");
+ for (String pattern : patterns) {
+ if (StringUtils.isNotEmpty(pattern)) {
+ String trimmedPattern = pattern.trim();
+ metricPatterns.add(trimmedPattern);
+ }
+ }
+ }
+ }
+ return metricPatterns;
+ }
+
+ /**
+ * Get the list of downsamplers that are configured in ams-site
+ * Sample config
+ <name>timeline.metrics.downsampler.topn.metric.patterns</name>
+ <value>dfs.NNTopUserOpCounts.windowMs=60000.op%,dfs.NNTopUserOpCounts.windowMs=300000.op%</value>
+
+ <name>timeline.metrics.downsampler.topn.value</name>
+ <value>10</value>
+
+ <name>timeline.metrics.downsampler.topn.function</name>
+ <value>max</value>
+ * @param configuration
+ * @return
+ */
+ public static List<CustomDownSampler> getDownSamplers(Configuration configuration) {
+
+ Map<String,String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*");
+ List<CustomDownSampler> downSamplers = new ArrayList<>();
+ Set<String> keys = conf.keySet();
+
+ try {
+ for (String key : keys) {
+ if (key.startsWith(downSamplerConfigPrefix) && key.endsWith(downSamplerMetricPatternsConfig)) {
+ String type = key.split("\\.")[3];
+ CustomDownSampler downSampler = getDownSamplerByType(type, conf);
+ if (downSampler != null) {
+ downSamplers.add(downSampler);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception caught while parsing downsampler configs from ams-site : " + e.getMessage());
+ }
+ return downSamplers;
+ }
+
+ public static CustomDownSampler getDownSamplerByType(String type, Map<String, String> conf) {
+ if (type == null) {
+ return null;
+ }
+
+ if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSampler)) {
+ return TopNDownSampler.fromConfig(conf);
+ }
+
+ LOG.warn("Unknown downsampler requested : " + type);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
new file mode 100644
index 0000000..dd67b64
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.ambari.metrics.webapp.TimelineWebServices;
+
+import java.util.Arrays;
+
+/**
+ * Is used to determine metrics aggregate table.
+ *
+ * @see TimelineWebServices#getTimelineMetric
+ * @see TimelineWebServices#getTimelineMetrics
+ */
+public class Function {
+ public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null);
+ private static final String SUFFIX_SEPARATOR = "\\._";
+
+ private ReadFunction readFunction = ReadFunction.VALUE;
+ private PostProcessingFunction postProcessingFunction = null;
+
+ public Function() {
+ }
+
+ public Function(ReadFunction readFunction,
+ PostProcessingFunction ppFunction){
+ if (readFunction!=null){
+ this.readFunction = readFunction ;
+ }
+ this.postProcessingFunction = ppFunction;
+ }
+
+ /**
+ * Segregate post processing function eg: rate from aggregate function,
+ * example: avg, in any order
+ * @param metricName metric name from request
+ * @return @Function
+ */
+ public static Function fromMetricName(String metricName) {
+ // gets postprocessing, and aggregation function
+ // ex. Metric._rate._avg
+ String[] parts = metricName.split(SUFFIX_SEPARATOR);
+
+ ReadFunction readFunction = ReadFunction.VALUE;
+ PostProcessingFunction ppFunction = null;
+
+ if (parts.length <= 1) {
+ return new Function(readFunction, null);
+ }
+ if (parts.length > 3) {
+ throw new IllegalArgumentException("Invalid number of functions specified.");
+ }
+
+ // Parse functions
+ boolean isSuccessful = false; // Best effort
+ for (String part : parts) {
+ if (ReadFunction.isPresent(part)) {
+ readFunction = ReadFunction.getFunction(part);
+ isSuccessful = true;
+ }
+ if (PostProcessingFunction.isPresent(part)) {
+ ppFunction = PostProcessingFunction.getFunction(part);
+ isSuccessful = true;
+ }
+ }
+
+ // Throw exception if parsing failed
+ if (!isSuccessful) {
+ throw new FunctionFormatException("Could not parse provided functions: " +
+ "" + Arrays.asList(parts));
+ }
+
+ return new Function(readFunction, ppFunction);
+ }
+
+ public String getSuffix(){
+ return (postProcessingFunction == null)? readFunction.getSuffix() :
+ postProcessingFunction.getSuffix() + readFunction.getSuffix();
+ }
+
+ public ReadFunction getReadFunction() {
+ return readFunction;
+ }
+
+ @Override
+ public String toString() {
+ return "Function{" +
+ "readFunction=" + readFunction +
+ ", postProcessingFunction=" + postProcessingFunction +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Function)) return false;
+
+ Function function = (Function) o;
+
+ return postProcessingFunction == function.postProcessingFunction
+ && readFunction == function.readFunction;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = readFunction.hashCode();
+ result = 31 * result + (postProcessingFunction != null ?
+ postProcessingFunction.hashCode() : 0);
+ return result;
+ }
+
+ public enum PostProcessingFunction {
+ NONE(""),
+ RATE("._rate"),
+ DIFF("._diff");
+
+ PostProcessingFunction(String suffix){
+ this.suffix = suffix;
+ }
+
+ private String suffix = "";
+
+ public String getSuffix(){
+ return suffix;
+ }
+
+ public static boolean isPresent(String functionName) {
+ try {
+ PostProcessingFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException {
+ if (functionName == null) {
+ return NONE;
+ }
+
+ try {
+ return PostProcessingFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new FunctionFormatException("Function should be ._rate", e);
+ }
+ }
+ }
+
+ public enum ReadFunction {
+ VALUE(""),
+ AVG("._avg"),
+ MIN("._min"),
+ MAX("._max"),
+ SUM("._sum");
+
+ private final String suffix;
+
+ ReadFunction(String suffix){
+ this.suffix = suffix;
+ }
+
+ public String getSuffix() {
+ return suffix;
+ }
+
+ public static boolean isPresent(String functionName) {
+ try {
+ ReadFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public static ReadFunction getFunction(String functionName) throws FunctionFormatException {
+ if (functionName == null) {
+ return VALUE;
+ }
+ try {
+ return ReadFunction.valueOf(functionName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new FunctionFormatException(
+ "Function should be sum, avg, min, max. Got " + functionName, e);
+ }
+ }
+ }
+
+ public static class FunctionFormatException extends IllegalArgumentException {
+ public FunctionFormatException(String message) {
+ super(message);
+ }
+
+ public FunctionFormatException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
new file mode 100644
index 0000000..6e81feb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+public class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+
+ public TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
new file mode 100644
index 0000000..3698f1b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
@@ -0,0 +1,59 @@
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface TimelineMetricAggregator extends Runnable {
+ /**
+ * Aggregate metric data within the time bounds.
+ *
+ * @param startTime start time millis
+ * @param endTime end time millis
+ * @return success
+ */
+ boolean doWork(long startTime, long endTime);
+
+ /**
+ * Is aggregator is disabled by configuration.
+ *
+ * @return true/false
+ */
+ boolean isDisabled();
+
+ /**
+ * Return aggregator Interval
+ *
+ * @return Interval in Millis
+ */
+ Long getSleepIntervalMillis();
+
+ /**
+ * Get aggregator name
+ * @return @AGGREGATOR_NAME
+ */
+ AGGREGATOR_NAME getName();
+
+ /**
+ * Known aggregator types
+ */
+ enum AGGREGATOR_TYPE {
+ CLUSTER,
+ HOST
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..b395b39
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+/**
+ * Factory class that knows how to create a aggregator instance using
+ * TimelineMetricConfiguration
+ */
+public class TimelineMetricAggregatorFactory {
+ private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+ private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-daily-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-minute-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-daily-checkpoint";
+
+ private static boolean useGroupByAggregator(Configuration metricsConf) {
+ return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"));
+ }
+
+ /**
+ * Minute based aggregation for hosts.
+ * Interval : 5 mins
+ */
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+ METRIC_RECORD_MINUTE,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricHostAggregator(
+ METRIC_RECORD_MINUTE,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController);
+ }
+
+ /**
+ * Hourly aggregation for hosts.
+ * Interval : 1 hour
+ */
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+ METRIC_RECORD_HOURLY,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricHostAggregator(
+ METRIC_RECORD_HOURLY,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l,
+ haController);
+ }
+
+ /**
+ * Daily aggregation for hosts.
+ * Interval : 1 day
+ */
+ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ HOST_AGGREGATE_DAILY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+ METRIC_RECORD_DAILY,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricHostAggregator(
+ METRIC_RECORD_DAILY,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l,
+ haController);
+ }
+
+ /**
+ * Second aggregation for cluster.
+ * Interval : 2 mins
+ * Timeslice : 30 sec
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController,
+ TimelineMetricDistributedCache distributedCache) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
+
+ long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
+ (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+
+ int checkpointCutOffMultiplier =
+ metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
+
+ // Second based aggregation have added responsibility of time slicing
+ if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) {
+ return new TimelineMetricClusterAggregatorSecondWithCacheSource(
+ METRIC_AGGREGATE_SECOND,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ null,
+ outputTableName,
+ 120000l,
+ timeSliceIntervalMillis,
+ haController,
+ distributedCache
+ );
+ }
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ return new TimelineMetricClusterAggregatorSecond(
+ METRIC_AGGREGATE_SECOND,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ timeSliceIntervalMillis,
+ haController
+ );
+ }
+
+ /**
+ * Minute aggregation for cluster.
+ * Interval : 5 mins
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_MINUTE,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_MINUTE,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ /**
+ * Hourly aggregation for cluster.
+ * Interval : 1 hour
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_HOURLY,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_HOURLY,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ /**
+ * Daily aggregation for cluster.
+ * Interval : 1 day
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
+ MetricCollectorHAController haController) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_DAILY,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ return new TimelineMetricClusterAggregator(
+ METRIC_AGGREGATE_DAILY,
+ metadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController
+ );
+ }
+
+ public static TimelineMetricAggregator createFilteringTimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager metricMetadataManager, MetricCollectorHAController haController, ConcurrentHashMap<String, Long> postedAggregatedMap) {
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricFilteringHostAggregator(
+ METRIC_RECORD_MINUTE,
+ metricMetadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController,
+ postedAggregatedMap
+ );
+ }
+
+ return new TimelineMetricFilteringHostAggregator(
+ METRIC_RECORD_MINUTE,
+ metricMetadataManager,
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ haController,
+ postedAggregatedMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
new file mode 100644
index 0000000..190ad9a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricsFilter;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+
+/**
+ * Aggregator responsible for providing app level host aggregates. This task
+ * is accomplished without doing a round trip to storage, rather
+ * TimelineMetricClusterAggregators are responsible for lifecycle of
+ * @TimelineMetricAppAggregator and provide the raw data to aggregate.
+ */
+public class TimelineMetricAppAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
+ // Lookup to check candidacy of an app
+ private final List<String> appIdsToAggregate;
+ private final Map<String, TimelineMetricHostMetadata> hostMetadata;
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
+ TimelineMetricMetadataManager metadataManagerInstance;
+
+ public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager,
+ Configuration metricsConf) {
+ appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
+ hostMetadata = metadataManager.getHostedAppsCache();
+ metadataManagerInstance = metadataManager;
+ LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
+ }
+
+ /**
+ * Lifecycle method to initialize aggregation cycle.
+ */
+ public void init() {
+ LOG.debug("Initializing aggregation cycle.");
+ aggregateClusterMetrics = new HashMap<>();
+ }
+
+ /**
+ * Lifecycle method to indicate end of aggregation cycle.
+ */
+ public void cleanup() {
+ LOG.debug("Cleanup aggregated data.");
+ aggregateClusterMetrics = null;
+ }
+
+ /**
+ * Calculate aggregates if the clusterMetric is a Host metric for recorded
+ * apps that are housed by this host.
+ *
+ * @param clusterMetric @TimelineClusterMetric Host / App metric
+ * @param hostname This is the hostname from which this clusterMetric originated.
+ * @param metricValue The metric value for this metric.
+ */
+ public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
+ String hostname, Double metricValue) {
+
+ String appId = clusterMetric.getAppId();
+ if (appId == null) {
+ return; // No real use case except tests
+ }
+
+ // If metric is a host metric and host has apps on it
+ if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
+ // Candidate metric, update app aggregates
+ if (hostMetadata.containsKey(hostname)) {
+ updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
+ }
+ } else {
+ // Build the hostedapps map if not a host metric
+ // Check app candidacy for host aggregation
+ if (appIdsToAggregate.contains(appId)) {
+ TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname);
+ ConcurrentHashMap<String, String> appIds;
+ if (timelineMetricHostMetadata == null) {
+ appIds = new ConcurrentHashMap<>();
+ hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds));
+ } else {
+ appIds = timelineMetricHostMetadata.getHostedApps();
+ }
+ if (!appIds.containsKey(appId)) {
+ appIds.put(appId, appId);
+ LOG.info("Adding appId to hosted apps: appId = " +
+ clusterMetric.getAppId() + ", hostname = " + hostname);
+ }
+ }
+ }
+ }
+
+ /**
+ * Build a cluster app metric from a host metric
+ */
+ private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric,
+ String hostname, Double metricValue) {
+
+ if (aggregateClusterMetrics == null) {
+ LOG.error("Aggregation requested without init call.");
+ return;
+ }
+
+ TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+ ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps();
+ for (String appId : apps.keySet()) {
+ if (appIdsToAggregate.contains(appId)) {
+
+ appKey.setAppId(appId);
+ TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
+ if (appMetadata == null) {
+ TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+ TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key);
+
+ if (hostMetricMetadata != null) {
+ TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(),
+ appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(),
+ hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId));
+ metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
+ }
+ }
+
+ // Add a new cluster aggregate metric if none exists
+ TimelineClusterMetric appTimelineClusterMetric =
+ new TimelineClusterMetric(clusterMetric.getMetricName(),
+ appId,
+ clusterMetric.getInstanceId(),
+ clusterMetric.getTimestamp());
+
+ MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
+
+ if (clusterAggregate == null) {
+ clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue);
+ aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate);
+ } else {
+ clusterAggregate.updateSum(metricValue);
+ clusterAggregate.updateNumberOfHosts(1);
+ clusterAggregate.updateMax(metricValue);
+ clusterAggregate.updateMin(metricValue);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Return current copy of aggregated data.
+ */
+ public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() {
+ return aggregateClusterMetrics;
+ }
+
+ private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
+ String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS);
+ if (!StringUtils.isEmpty(appIds)) {
+ return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+ }
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..2ea5309
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ private final TimelineMetricReadHelper readHelper;
+ private final boolean isClusterPrecisionInputTable;
+
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String inputTableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ MetricCollectorHAController haController) {
+ super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam, inputTableName, outputTableName,
+ nativeTimeRangeDelay, haController);
+ isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+ readHelper = new TimelineMetricReadHelper(metricMetadataManager, true);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, tableName);
+ // HOST_COUNT vs METRIC_COUNT
+ if (isClusterPrecisionInputTable) {
+ sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, tableName);
+ }
+
+ condition.setStatement(sqlStr);
+ condition.addOrderByColumn("UUID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
+ }
+
+ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
+ throws IOException, SQLException {
+
+ TimelineClusterMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+ int perMetricCount = 0;
+
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+
+ MetricClusterAggregate currentHostAggregate =
+ isClusterPrecisionInputTable ?
+ readHelper.getMetricClusterAggregateFromResultSet(rs) :
+ readHelper.getMetricClusterTimeAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ currentMetric.setTimestamp(endTime);
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ perMetricCount++;
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ perMetricCount++;
+ } else {
+ // Switched over to a new metric - save new metric
+
+ hostAggregate.setSum(hostAggregate.getSum() / (perMetricCount - 1));
+ hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)(perMetricCount - 1)));
+ perMetricCount = 1;
+
+ hostAggregate = new MetricHostAggregate();
+ currentMetric.setTimestamp(endTime);
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+
+ }
+
+ if (existingMetric != null) {
+ hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
+ hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
+ }
+
+ return hostAggregateMap;
+ }
+
+ private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+ agg.updateMax(currentClusterAggregate.getMax());
+ agg.updateMin(currentClusterAggregate.getMin());
+ agg.updateSum(currentClusterAggregate.getSum());
+ agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
new file mode 100644
index 0000000..46c82d6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
+ public Long timeSliceIntervalMillis;
+ private TimelineMetricReadHelper timelineMetricReadHelper;
+ // Aggregator to perform app-level aggregates for host metrics
+ private final TimelineMetricAppAggregator appAggregator;
+ // 1 minute client side buffering adjustment
+ protected final Long serverTimeShiftAdjustment;
+ protected final boolean interpolationEnabled;
+ private TimelineMetricMetadataManager metadataManagerInstance;
+ private String skipAggrPatternStrings;
+ private final static String liveHostsMetricName = "live_hosts";
+
+ public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metadataManager,
+ PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String aggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ Long timeSliceInterval,
+ MetricCollectorHAController haController) {
+ super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
+
+ this.metadataManagerInstance = metadataManager;
+ appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
+ this.timeSliceIntervalMillis = timeSliceInterval;
+ this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
+ this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
+ this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+ this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
+ // Account for time shift due to client side buffering by shifting the
+ // timestamps with the difference between server time and series start time
+ // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
+ // that come earlier than the expected, during the next run.
+ List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
+ // Initialize app aggregates for host metrics
+ appAggregator.init();
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+ appAggregator.cleanup();
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+
+ List<String> metricNames = new ArrayList<>();
+ boolean metricNamesNotCondition = false;
+
+ if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
+ LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings);
+ metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
+ metricNamesNotCondition = true;
+ }
+
+ Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment,
+ endTime, null, null, true);
+ condition.setMetricNamesNotCondition(metricNamesNotCondition);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_SQL,
+ METRICS_RECORD_TABLE_NAME));
+ // Retaining order of the row-key avoids client side merge sort.
+ condition.addOrderByColumn("UUID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ TimelineMetric metric = null;
+ Map<String, MutableInt> hostedAppCounter = new HashMap<>();
+ if (rs.next()) {
+ metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+ // Call slice after all rows for a host are read
+ while (rs.next()) {
+ TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+ // If rows belong to same host combine them before slicing. This
+ // avoids issues across rows that belong to same hosts but get
+ // counted as coming from different ones.
+ if (metric.equalsExceptTime(nextMetric)) {
+ metric.addMetricValues(nextMetric.getMetricValues());
+ } else {
+ // Process the current metric
+ int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ } else {
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+ if (currentHostCount < numHosts) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ }
+ }
+ metric = nextMetric;
+ }
+ }
+ }
+ // Process last metric
+ if (metric != null) {
+ int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ } else {
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+ if (currentHostCount < numHosts) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ }
+ }
+ }
+
+ // Add app level aggregates to save
+ aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+
+ // Add liveHosts per AppId metrics.
+ long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
+ processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
+
+ return aggregateClusterMetrics;
+ }
+
+ /**
+ * Slice metric values into interval specified by :
+ * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+ * Normalize value by averaging them within the interval
+ */
+ protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ TimelineMetric metric, List<Long[]> timeSlices) {
+ // Create time slices
+ TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId());
+ TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
+
+ if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
+ LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
+ return 0;
+ }
+
+ Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
+
+ return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName());
+ }
+
+ protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics,
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ String hostname) {
+
+ int numHosts = 0;
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
+
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ Double avgValue = clusterMetricEntry.getValue();
+
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+
+ numHosts = aggregate.getNumberOfHosts();
+ // Update app level aggregates
+ appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue);
+ }
+ }
+ return numHosts;
+ }
+
+ /* Add cluster metric for number of hosts that are hosting an appId */
+ protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ Map<String, MutableInt> appHostsCount, long timestamp) {
+
+ for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
+ liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);
+
+ Integer numOfHosts = appHostsEntry.getValue().intValue();
+
+ MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
+ (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
+
+ metadataManagerInstance.getUuid(timelineClusterMetric);
+
+ aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
+ }
+
+ }
+}