You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:50 UTC

[24/28] ambari git commit: AMBARI-22740 : Fix integration test for HBase in branch-3.0-ams due to UUID changes. (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
new file mode 100644
index 0000000..43c1f2b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+/**
+ *
+ */
+public class AggregatorUtils {
+
+  private static final Log LOG = LogFactory.getLog(AggregatorUtils.class);
+
+  public static double[] calculateAggregates(Map<Long, Double> metricValues) {
+    double[] values = new double[4];
+    double max = Double.MIN_VALUE;
+    double min = Double.MAX_VALUE;
+    double sum = 0.0;
+    int metricCount = 0;
+
+    if (metricValues != null && !metricValues.isEmpty()) {
+      for (Double value : metricValues.values()) {
+        // TODO: Some nulls in data - need to investigate null values from host
+        if (value != null) {
+          if (value > max) {
+            max = value;
+          }
+          if (value < min) {
+            min = value;
+          }
+          sum += value;
+        }
+      }
+      metricCount = metricValues.values().size();
+    }
+    // BR: WHY ZERO is a good idea?
+    values[0] = sum;
+    values[1] = max != Double.MIN_VALUE ? max : 0.0;
+    values[2] = min != Double.MAX_VALUE ? min : 0.0;
+    values[3] = metricCount;
+
+    return values;
+  }
+
+  public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+      TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean interpolationEnabled) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+        new HashMap<>();
+
+    Long prevTimestamp = -1l;
+    TimelineClusterMetric prevMetric = null;
+    int count = 0;
+    double sum = 0.0;
+
+    Map<Long,Double> timeSliceValueMap = new HashMap<>();
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      if (metric.getValue() == null) {
+        continue;
+      }
+
+      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+            timelineMetric.getMetricName(),
+            timelineMetric.getAppId(),
+            timelineMetric.getInstanceId(),
+            timestamp);
+
+        if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
+          Double newValue = metric.getValue();
+          if (newValue > 0.0) {
+            sum += newValue;
+            count++;
+          }
+        } else {
+          double metricValue = (count > 0) ? (sum / count) : 0.0;
+          timelineClusterMetricMap.put(prevMetric, metricValue);
+          timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
+          sum = metric.getValue();
+          count = sum > 0.0 ? 1 : 0;
+        }
+
+        prevTimestamp = timestamp;
+        prevMetric = clusterMetric;
+      }
+    }
+
+    if (prevTimestamp > 0) {
+      double metricValue = (count > 0) ? (sum / count) : 0.0;
+      timelineClusterMetricMap.put(prevMetric, metricValue);
+      timeSliceValueMap.put(prevTimestamp, metricValue);
+    }
+
+    if (interpolationEnabled) {
+      Map<Long, Double> interpolatedValues = interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, timeSliceValueMap, timelineMetric.getType());
+      for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) {
+        TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey());
+        timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, entry.getValue());
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, Double> metricValues,
+                                               List<Long[]> timeSlices,
+                                               Map<Long, Double> timeSliceValueMap, String type) {
+    Map<Long, Double> resultClusterMetricMap = new HashMap<>();
+
+    if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) {
+      //For Counter Based metrics, ok to do interpolation and extrapolation
+
+      List<Long> requiredTimestamps = new ArrayList<>();
+      for (Long[] timeSlice : timeSlices) {
+        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+          requiredTimestamps.add(timeSlice[1]);
+        }
+      }
+      Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(metricValues, requiredTimestamps);
+
+      if (interpolatedValuesMap != null) {
+        for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) {
+          Double interpolatedValue = entry.getValue();
+
+          if (interpolatedValue != null) {
+            resultClusterMetricMap.put( entry.getKey(), interpolatedValue);
+          } else {
+            LOG.debug("Cannot compute interpolated value, hence skipping.");
+          }
+        }
+      }
+    } else {
+      //For other metrics, ok to do only interpolation
+
+      Double defaultNextSeenValue = null;
+      if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(metricValues)) {
+        //If no value was found within the start_time based slices, but the metric has value in the server_time range,
+        // use that.
+
+        Map.Entry<Long,Double> firstEntry  = metricValues.firstEntry();
+        defaultNextSeenValue = firstEntry.getValue();
+        LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
+      }
+
+      for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
+        Long[] timeSlice = timeSlices.get(sliceNum);
+
+        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+          LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
+
+          Double lastSeenValue = null;
+          int index = sliceNum - 1;
+          Long[] prevTimeSlice = null;
+          while (lastSeenValue == null && index >= 0) {
+            prevTimeSlice = timeSlices.get(index--);
+            lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
+          }
+
+          Double nextSeenValue = null;
+          index = sliceNum + 1;
+          Long[] nextTimeSlice = null;
+          while (nextSeenValue == null && index < timeSlices.size()) {
+            nextTimeSlice = timeSlices.get(index++);
+            nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
+          }
+
+          if (nextSeenValue == null) {
+            nextSeenValue = defaultNextSeenValue;
+          }
+
+          Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
+              (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
+              (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
+
+          if (interpolatedValue != null) {
+            LOG.debug("Interpolated value : " + interpolatedValue);
+            resultClusterMetricMap.put(timeSlice[1], interpolatedValue);
+          } else {
+            LOG.debug("Cannot compute interpolated value, hence skipping.");
+          }
+        }
+      }
+    }
+    return resultClusterMetricMap;
+  }
+
+  /**
+   * Return end of the time slice into which the metric fits.
+   */
+  public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[1];
+      }
+    }
+    return -1l;
+  }
+
+  /**
+   * Return time slices to normalize the timeseries data.
+   */
+  public static  List<Long[]> getTimeSlices(long startTime, long endTime, long timeSliceIntervalMillis) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
+    return referenceTime - (referenceTime % aggregatorPeriod);
+  }
+
+  public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
+    long currentTime = System.currentTimeMillis();
+    return currentTime - (currentTime % aggregatorPeriod);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
new file mode 100644
index 0000000..49e2bf6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.List;
+
+/**
+ * Interface to add a custom downsampler.
+ * Each configured downsampler will be executed during an aggregation cycle.
+ */
+public interface CustomDownSampler {
+
+  /**
+   * Gatekeeper to check the configs. If this fails, the downsampling is not done.
+   * @return
+   */
+  public boolean validateConfigs();
+
+  /**
+   * Return the set of statements that needs to be executed for the downsampling.
+   * @param startTime
+   * @param endTime
+   * @param tableName
+   * @return
+   */
+  public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
new file mode 100644
index 0000000..ad47931
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * DownSampler utility class. Responsible for fetching downsampler configs from Metrics config, and determine if
+ * any downsamplers are configured.
+ */
+
+public class DownSamplerUtils {
+
+  public static final String downSamplerConfigPrefix = "timeline.metrics.downsampler.";
+  public static final String downSamplerMetricPatternsConfig = "metric.patterns";
+  public static final String topNDownSampler = "topn";
+  private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class);
+
+
+
+  /**
+   * Get the list of metrics that are requested to be downsampled.
+   * @param configuration
+   * @return List of metric patterns/names that are to be downsampled.
+   */
+  public static List<String> getDownsampleMetricPatterns(Configuration configuration) {
+    Map<String, String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*");
+    List<String> metricPatterns = new ArrayList<>();
+    Set<String> keys = conf.keySet();
+    for (String key : keys) {
+      if (key.endsWith(downSamplerMetricPatternsConfig)) {
+        String patternString = conf.get(key);
+        String[] patterns = StringUtils.split(patternString, ",");
+        for (String pattern : patterns) {
+          if (StringUtils.isNotEmpty(pattern)) {
+            String trimmedPattern = pattern.trim();
+            metricPatterns.add(trimmedPattern);
+          }
+        }
+      }
+    }
+    return metricPatterns;
+  }
+
+  /**
+   * Get the list of downsamplers that are configured in ams-site
+   * Sample config
+   <name>timeline.metrics.downsampler.topn.metric.patterns</name>
+   <value>dfs.NNTopUserOpCounts.windowMs=60000.op%,dfs.NNTopUserOpCounts.windowMs=300000.op%</value>
+
+   <name>timeline.metrics.downsampler.topn.value</name>
+   <value>10</value>
+
+   <name>timeline.metrics.downsampler.topn.function</name>
+   <value>max</value>
+   * @param configuration
+   * @return
+   */
+  public static List<CustomDownSampler> getDownSamplers(Configuration configuration) {
+
+    Map<String,String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*");
+    List<CustomDownSampler> downSamplers = new ArrayList<>();
+    Set<String> keys = conf.keySet();
+
+    try {
+      for (String key : keys) {
+        if (key.startsWith(downSamplerConfigPrefix) && key.endsWith(downSamplerMetricPatternsConfig)) {
+          String type = key.split("\\.")[3];
+          CustomDownSampler downSampler = getDownSamplerByType(type, conf);
+          if (downSampler != null) {
+            downSamplers.add(downSampler);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception caught while parsing downsampler configs from ams-site : " + e.getMessage());
+    }
+    return downSamplers;
+  }
+
+  public static CustomDownSampler getDownSamplerByType(String type, Map<String, String> conf) {
+    if (type == null) {
+      return null;
+    }
+
+    if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSampler)) {
+      return TopNDownSampler.fromConfig(conf);
+    }
+
+    LOG.warn("Unknown downsampler requested : " + type);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
new file mode 100644
index 0000000..dd67b64
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.ambari.metrics.webapp.TimelineWebServices;
+
+import java.util.Arrays;
+
+/**
+ * Is used to determine metrics aggregate table.
+ *
+ * @see TimelineWebServices#getTimelineMetric
+ * @see TimelineWebServices#getTimelineMetrics
+ */
+public class Function {
+  public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null);
+  private static final String SUFFIX_SEPARATOR = "\\._";
+
+  private ReadFunction readFunction = ReadFunction.VALUE;
+  private PostProcessingFunction postProcessingFunction = null;
+
+  public Function() {
+  }
+
+  public Function(ReadFunction readFunction,
+                  PostProcessingFunction ppFunction){
+    if (readFunction!=null){
+      this.readFunction = readFunction ;
+    }
+    this.postProcessingFunction = ppFunction;
+  }
+
+  /**
+   * Segregate post processing function eg: rate from aggregate function,
+   * example: avg, in any order
+   * @param metricName metric name from request
+   * @return @Function
+   */
+  public static Function fromMetricName(String metricName) {
+    // gets postprocessing, and aggregation function
+    // ex. Metric._rate._avg
+    String[] parts = metricName.split(SUFFIX_SEPARATOR);
+
+    ReadFunction readFunction = ReadFunction.VALUE;
+    PostProcessingFunction ppFunction = null;
+
+    if (parts.length <= 1) {
+      return new Function(readFunction, null);
+    }
+    if (parts.length > 3) {
+      throw new IllegalArgumentException("Invalid number of functions specified.");
+    }
+
+    // Parse functions
+    boolean isSuccessful = false; // Best effort
+    for (String part : parts) {
+      if (ReadFunction.isPresent(part)) {
+        readFunction = ReadFunction.getFunction(part);
+        isSuccessful = true;
+      }
+      if (PostProcessingFunction.isPresent(part)) {
+        ppFunction = PostProcessingFunction.getFunction(part);
+        isSuccessful = true;
+      }
+    }
+
+    // Throw exception if parsing failed
+    if (!isSuccessful) {
+      throw new FunctionFormatException("Could not parse provided functions: " +
+        "" + Arrays.asList(parts));
+    }
+
+    return new Function(readFunction, ppFunction);
+  }
+
+  public String getSuffix(){
+    return (postProcessingFunction == null)? readFunction.getSuffix() :
+      postProcessingFunction.getSuffix() + readFunction.getSuffix();
+  }
+
+  public ReadFunction getReadFunction() {
+    return readFunction;
+  }
+
+  @Override
+  public String toString() {
+    return "Function{" +
+      "readFunction=" + readFunction +
+      ", postProcessingFunction=" + postProcessingFunction +
+      '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Function)) return false;
+
+    Function function = (Function) o;
+
+    return postProcessingFunction == function.postProcessingFunction
+      && readFunction == function.readFunction;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = readFunction.hashCode();
+    result = 31 * result + (postProcessingFunction != null ?
+      postProcessingFunction.hashCode() : 0);
+    return result;
+  }
+
+  public enum PostProcessingFunction {
+    NONE(""),
+    RATE("._rate"),
+    DIFF("._diff");
+
+    PostProcessingFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    private String suffix = "";
+
+    public String getSuffix(){
+      return suffix;
+    }
+
+    public static boolean isPresent(String functionName) {
+      try {
+        PostProcessingFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        return false;
+      }
+      return true;
+    }
+
+    public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException {
+      if (functionName == null) {
+        return NONE;
+      }
+
+      try {
+        return PostProcessingFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException("Function should be ._rate", e);
+      }
+    }
+  }
+
+  public enum ReadFunction {
+    VALUE(""),
+    AVG("._avg"),
+    MIN("._min"),
+    MAX("._max"),
+    SUM("._sum");
+
+    private final String suffix;
+
+    ReadFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    public String getSuffix() {
+      return suffix;
+    }
+
+    public static boolean isPresent(String functionName) {
+      try {
+        ReadFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        return false;
+      }
+      return true;
+    }
+
+    public static ReadFunction getFunction(String functionName) throws FunctionFormatException {
+      if (functionName == null) {
+        return VALUE;
+      }
+      try {
+        return ReadFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException(
+          "Function should be sum, avg, min, max. Got " + functionName, e);
+      }
+    }
+  }
+
+  public static class FunctionFormatException extends IllegalArgumentException {
+    public FunctionFormatException(String message) {
+      super(message);
+    }
+
+    public FunctionFormatException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
new file mode 100644
index 0000000..6e81feb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+
+  public TimelineClusterMetric(String metricName, String appId, String instanceId,
+                        long timestamp) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
new file mode 100644
index 0000000..3698f1b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java
@@ -0,0 +1,59 @@
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface TimelineMetricAggregator extends Runnable {
+  /**
+   * Aggregate metric data within the time bounds.
+   *
+   * @param startTime start time millis
+   * @param endTime   end time millis
+   * @return success
+   */
+  boolean doWork(long startTime, long endTime);
+
+  /**
+   * Is aggregator is disabled by configuration.
+   *
+   * @return true/false
+   */
+  boolean isDisabled();
+
+  /**
+   * Return aggregator Interval
+   *
+   * @return Interval in Millis
+   */
+  Long getSleepIntervalMillis();
+
+  /**
+   * Get aggregator name
+   * @return @AGGREGATOR_NAME
+   */
+  AGGREGATOR_NAME getName();
+
+  /**
+   * Known aggregator types
+   */
+  enum AGGREGATOR_TYPE {
+    CLUSTER,
+    HOST
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..b395b39
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+/**
+ * Factory class that knows how to create a aggregator instance using
+ * TimelineMetricConfiguration
+ */
+public class TimelineMetricAggregatorFactory {
+  private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+  private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-daily-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-minute-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-daily-checkpoint";
+
+  private static boolean useGroupByAggregator(Configuration metricsConf) {
+    return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"));
+  }
+
+  /**
+   * Minute based aggregation for hosts.
+   * Interval : 5 mins
+   */
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricMetadataManager metadataManager,
+     MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        METRIC_RECORD_MINUTE,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricHostAggregator(
+      METRIC_RECORD_MINUTE,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      haController);
+  }
+
+  /**
+   * Hourly aggregation for hosts.
+   * Interval : 1 hour
+   */
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricMetadataManager metadataManager,
+     MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        METRIC_RECORD_HOURLY,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricHostAggregator(
+      METRIC_RECORD_HOURLY,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l,
+      haController);
+  }
+
+  /**
+   * Daily aggregation for hosts.
+   * Interval : 1 day
+   */
+  public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+     TimelineMetricMetadataManager metadataManager,
+     MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      HOST_AGGREGATE_DAILY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        METRIC_RECORD_DAILY,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricHostAggregator(
+      METRIC_RECORD_DAILY,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l,
+      haController);
+  }
+
+  /**
+   * Second aggregation for cluster.
+   * Interval : 2 mins
+   * Timeslice : 30 sec
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricMetadataManager metadataManager,
+    MetricCollectorHAController haController,
+    TimelineMetricDistributedCache distributedCache) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
+
+    long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+
+    int checkpointCutOffMultiplier =
+      metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
+
+    // Second based aggregation have added responsibility of time slicing
+    if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) {
+      return new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND,
+        metadataManager,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        null,
+        outputTableName,
+        120000l,
+        timeSliceIntervalMillis,
+        haController,
+        distributedCache
+      );
+    }
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    return new TimelineMetricClusterAggregatorSecond(
+      METRIC_AGGREGATE_SECOND,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      timeSliceIntervalMillis,
+      haController
+    );
+  }
+
+  /**
+   * Minute aggregation for cluster.
+   * Interval : 5 mins
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricMetadataManager metadataManager,
+    MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        METRIC_AGGREGATE_MINUTE,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricClusterAggregator(
+      METRIC_AGGREGATE_MINUTE,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      haController
+    );
+  }
+
+  /**
+   * Hourly aggregation for cluster.
+   * Interval : 1 hour
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricMetadataManager metadataManager,
+    MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        METRIC_AGGREGATE_HOURLY,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricClusterAggregator(
+      METRIC_AGGREGATE_HOURLY,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      haController
+    );
+  }
+
+  /**
+   * Daily aggregation for cluster.
+   * Interval : 1 day
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+    TimelineMetricMetadataManager metadataManager,
+    MetricCollectorHAController haController) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        METRIC_AGGREGATE_DAILY,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l,
+        haController
+      );
+    }
+
+    return new TimelineMetricClusterAggregator(
+      METRIC_AGGREGATE_DAILY,
+      metadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      haController
+    );
+  }
+
+  public static TimelineMetricAggregator createFilteringTimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager metricMetadataManager, MetricCollectorHAController haController, ConcurrentHashMap<String, Long> postedAggregatedMap) {
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricFilteringHostAggregator(
+        METRIC_RECORD_MINUTE,
+        metricMetadataManager,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l,
+        haController,
+        postedAggregatedMap
+      );
+    }
+
+    return new TimelineMetricFilteringHostAggregator(
+      METRIC_RECORD_MINUTE,
+      metricMetadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      haController,
+      postedAggregatedMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
new file mode 100644
index 0000000..190ad9a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricsFilter;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+
+/**
+ * Aggregator responsible for providing app level host aggregates. This task
+ * is accomplished without doing a round trip to storage, rather
+ * TimelineMetricClusterAggregators are responsible for lifecycle of
+ * @TimelineMetricAppAggregator and provide the raw data to aggregate.
+ */
+public class TimelineMetricAppAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
+  // Lookup to check candidacy of an app
+  private final List<String> appIdsToAggregate;
+  private final Map<String, TimelineMetricHostMetadata> hostMetadata;
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
+  TimelineMetricMetadataManager metadataManagerInstance;
+
+  public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager,
+                                     Configuration metricsConf) {
+    appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
+    hostMetadata = metadataManager.getHostedAppsCache();
+    metadataManagerInstance = metadataManager;
+    LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
+  }
+
+  /**
+   * Lifecycle method to initialize aggregation cycle.
+   */
+  public void init() {
+    LOG.debug("Initializing aggregation cycle.");
+    aggregateClusterMetrics = new HashMap<>();
+  }
+
+  /**
+   * Lifecycle method to indicate end of aggregation cycle.
+   */
+  public void cleanup() {
+    LOG.debug("Cleanup aggregated data.");
+    aggregateClusterMetrics = null;
+  }
+
+  /**
+   * Calculate aggregates if the clusterMetric is a Host metric for recorded
+   * apps that are housed by this host.
+   *
+   * @param clusterMetric @TimelineClusterMetric Host / App metric
+   * @param hostname This is the hostname from which this clusterMetric originated.
+   * @param metricValue The metric value for this metric.
+   */
+  public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
+                                           String hostname, Double metricValue) {
+
+    String appId = clusterMetric.getAppId();
+    if (appId == null) {
+      return; // No real use case except tests
+    }
+
+    // If metric is a host metric and host has apps on it
+    if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
+      // Candidate metric, update app aggregates
+      if (hostMetadata.containsKey(hostname)) {
+        updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
+      }
+    } else {
+      // Build the hostedapps map if not a host metric
+      // Check app candidacy for host aggregation
+      if (appIdsToAggregate.contains(appId)) {
+        TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname);
+        ConcurrentHashMap<String, String> appIds;
+        if (timelineMetricHostMetadata == null) {
+          appIds = new ConcurrentHashMap<>();
+          hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds));
+        } else {
+          appIds = timelineMetricHostMetadata.getHostedApps();
+        }
+        if (!appIds.containsKey(appId)) {
+          appIds.put(appId, appId);
+          LOG.info("Adding appId to hosted apps: appId = " +
+            clusterMetric.getAppId() + ", hostname = " + hostname);
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a cluster app metric from a host metric
+   */
+  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric,
+                                                 String hostname, Double metricValue) {
+
+    if (aggregateClusterMetrics == null) {
+      LOG.error("Aggregation requested without init call.");
+      return;
+    }
+
+    TimelineMetricMetadataKey appKey =  new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+    ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps();
+    for (String appId : apps.keySet()) {
+      if (appIdsToAggregate.contains(appId)) {
+
+        appKey.setAppId(appId);
+        TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
+        if (appMetadata == null) {
+          TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+          TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key);
+
+          if (hostMetricMetadata != null) {
+            TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(),
+              appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(),
+              hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId));
+            metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
+          }
+        }
+
+        // Add a new cluster aggregate metric if none exists
+        TimelineClusterMetric appTimelineClusterMetric =
+          new TimelineClusterMetric(clusterMetric.getMetricName(),
+            appId,
+            clusterMetric.getInstanceId(),
+            clusterMetric.getTimestamp());
+
+        MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
+
+        if (clusterAggregate == null) {
+          clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue);
+          aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate);
+        } else {
+          clusterAggregate.updateSum(metricValue);
+          clusterAggregate.updateNumberOfHosts(1);
+          clusterAggregate.updateMax(metricValue);
+          clusterAggregate.updateMin(metricValue);
+        }
+      }
+
+    }
+  }
+
+  /**
+   * Return current copy of aggregated data.
+   */
+  public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() {
+    return aggregateClusterMetrics;
+  }
+
+  private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
+    String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS);
+    if (!StringUtils.isEmpty(appIds)) {
+      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+    }
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..2ea5309
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private final TimelineMetricReadHelper readHelper;
+  private final boolean isClusterPrecisionInputTable;
+
+  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+                                         TimelineMetricMetadataManager metricMetadataManager,
+                                         PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf,
+                                         String checkpointLocation,
+                                         Long sleepIntervalMillis,
+                                         Integer checkpointCutOffMultiplier,
+                                         String hostAggregatorDisabledParam,
+                                         String inputTableName,
+                                         String outputTableName,
+                                         Long nativeTimeRangeDelay,
+                                         MetricCollectorHAController haController) {
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam, inputTableName, outputTableName,
+      nativeTimeRangeDelay, haController);
+    isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+    readHelper = new TimelineMetricReadHelper(metricMetadataManager, true);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, tableName);
+    // HOST_COUNT vs METRIC_COUNT
+    if (isClusterPrecisionInputTable) {
+      sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, tableName);
+    }
+
+    condition.setStatement(sqlStr);
+    condition.addOrderByColumn("UUID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
+  }
+
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
+    throws IOException, SQLException {
+
+    TimelineClusterMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+    int perMetricCount = 0;
+
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+
+      MetricClusterAggregate currentHostAggregate =
+        isClusterPrecisionInputTable ?
+          readHelper.getMetricClusterAggregateFromResultSet(rs) :
+          readHelper.getMetricClusterTimeAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        currentMetric.setTimestamp(endTime);
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        perMetricCount++;
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+        perMetricCount++;
+      } else {
+        // Switched over to a new metric - save new metric
+
+        hostAggregate.setSum(hostAggregate.getSum() / (perMetricCount - 1));
+        hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)(perMetricCount - 1)));
+        perMetricCount = 1;
+
+        hostAggregate = new MetricHostAggregate();
+        currentMetric.setTimestamp(endTime);
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+
+    }
+
+    if (existingMetric != null) {
+      hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
+      hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
+    }
+
+    return hostAggregateMap;
+  }
+
+  private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+    agg.updateMax(currentClusterAggregate.getMax());
+    agg.updateMin(currentClusterAggregate.getMin());
+    agg.updateSum(currentClusterAggregate.getSum());
+    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
new file mode 100644
index 0000000..46c82d6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
+  public Long timeSliceIntervalMillis;
+  private TimelineMetricReadHelper timelineMetricReadHelper;
+  // Aggregator to perform app-level aggregates for host metrics
+  private final TimelineMetricAppAggregator appAggregator;
+  // 1 minute client side buffering adjustment
+  protected final Long serverTimeShiftAdjustment;
+  protected final boolean interpolationEnabled;
+  private TimelineMetricMetadataManager metadataManagerInstance;
+  private String skipAggrPatternStrings;
+  private final static String liveHostsMetricName = "live_hosts";
+
+  public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
+                                               TimelineMetricMetadataManager metadataManager,
+                                               PhoenixHBaseAccessor hBaseAccessor,
+                                               Configuration metricsConf,
+                                               String checkpointLocation,
+                                               Long sleepIntervalMillis,
+                                               Integer checkpointCutOffMultiplier,
+                                               String aggregatorDisabledParam,
+                                               String tableName,
+                                               String outputTableName,
+                                               Long nativeTimeRangeDelay,
+                                               Long timeSliceInterval,
+                                               MetricCollectorHAController haController) {
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
+
+    this.metadataManagerInstance = metadataManager;
+    appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
+    this.timeSliceIntervalMillis = timeSliceInterval;
+    this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
+    this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
+    this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+    this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
+    // Account for time shift due to client side buffering by shifting the
+    // timestamps with the difference between server time and series start time
+    // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
+    // that come earlier than the expected, during the next run.
+    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
+    // Initialize app aggregates for host metrics
+    appAggregator.init();
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+    appAggregator.cleanup();
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+
+    List<String> metricNames = new ArrayList<>();
+    boolean metricNamesNotCondition = false;
+
+    if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
+      LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings);
+      metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
+      metricNamesNotCondition = true;
+    }
+
+    Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment,
+      endTime, null, null, true);
+    condition.setMetricNamesNotCondition(metricNamesNotCondition);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_SQL,
+      METRICS_RECORD_TABLE_NAME));
+    // Retaining order of the row-key avoids client side merge sort.
+    condition.addOrderByColumn("UUID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    TimelineMetric metric = null;
+    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
+    if (rs.next()) {
+      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+      // Call slice after all rows for a host are read
+      while (rs.next()) {
+        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+        // If rows belong to same host combine them before slicing. This
+        // avoids issues across rows that belong to same hosts but get
+        // counted as coming from different ones.
+        if (metric.equalsExceptTime(nextMetric)) {
+          metric.addMetricValues(nextMetric.getMetricValues());
+        } else {
+          // Process the current metric
+          int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+          if (!hostedAppCounter.containsKey(metric.getAppId())) {
+            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+          } else {
+            int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+            if (currentHostCount < numHosts) {
+              hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+            }
+          }
+          metric = nextMetric;
+        }
+      }
+    }
+    // Process last metric
+    if (metric != null) {
+      int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+      if (!hostedAppCounter.containsKey(metric.getAppId())) {
+        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+      } else {
+        int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+        if (currentHostCount < numHosts) {
+          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+        }
+      }
+    }
+
+    // Add app level aggregates to save
+    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+
+    // Add liveHosts per AppId metrics.
+    long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
+    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
+
+    return aggregateClusterMetrics;
+  }
+
+  /**
+   * Slice metric values into interval specified by :
+   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+   * Normalize value by averaging them within the interval
+   */
+  protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+                                              TimelineMetric metric, List<Long[]> timeSlices) {
+    // Create time slices
+    TimelineMetricMetadataKey appKey =  new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId());
+    TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
+
+    if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
+      LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
+      return 0;
+    }
+
+    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
+
+    return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName());
+  }
+
+  protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics,
+                                                  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+                                                  String hostname) {
+
+    int numHosts = 0;
+    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
+
+        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+        Double avgValue = clusterMetricEntry.getValue();
+
+        MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+        if (aggregate == null) {
+          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+          aggregateClusterMetrics.put(clusterMetric, aggregate);
+        } else {
+          aggregate.updateSum(avgValue);
+          aggregate.updateNumberOfHosts(1);
+          aggregate.updateMax(avgValue);
+          aggregate.updateMin(avgValue);
+        }
+
+        numHosts = aggregate.getNumberOfHosts();
+        // Update app level aggregates
+        appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue);
+      }
+    }
+    return numHosts;
+  }
+
+  /* Add cluster metric for number of hosts that are hosting an appId */
+  protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+      Map<String, MutableInt> appHostsCount, long timestamp) {
+
+    for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
+      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
+        liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);
+
+      Integer numOfHosts = appHostsEntry.getValue().intValue();
+
+      MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
+        (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
+
+      metadataManagerInstance.getUuid(timelineClusterMetric);
+
+      aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
+    }
+
+  }
+}