You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/01/08 18:19:49 UTC

[23/28] ambari git commit: AMBARI-22740 : Fix integration test for HBase in branch-3.0-ams due to UUID changes. (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
new file mode 100644
index 0000000..69122f9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+
+public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond {
+  private TimelineMetricDistributedCache distributedCache;
+  public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName,
+                                                              Long nativeTimeRangeDelay,
+                                                              Long timeSliceInterval,
+                                                              MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) {
+    super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
+    this.distributedCache = distributedCache;
+  }
+
+  @Override
+  public boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+          "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
+    try {
+      Map<String, Double> caheMetrics;
+      if (LOG.isDebugEnabled()) {
+        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+        LOG.debug("Ignite metrics before eviction : " + caheMetrics);
+      }
+
+      LOG.info("Trying to evict elements from cache");
+      Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
+      LOG.info(String.format("Evicted %s elements from cache.", metricsFromCache.size()));
+
+      if (LOG.isDebugEnabled()) {
+        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+        LOG.debug("Ignite metrics after eviction : " + caheMetrics);
+      }
+
+      List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
+      Map<TimelineClusterMetric, MetricClusterAggregate> result = aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices);
+
+      LOG.info("Saving " + result.size() + " metric aggregates.");
+      hBaseAccessor.saveClusterAggregateRecords(result);
+      LOG.info("End aggregation cycle @ " + new Date());
+      return true;
+    } catch (Exception e) {
+      LOG.error("Exception during aggregation. ", e);
+      return false;
+    }
+  }
+
+  //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
+    //TODO add basic interpolation
+    //TODO investigate if needed, maybe add config to disable/enable
+    //count hosted apps
+    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
+      int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
+      String appId = clusterMetricAggregateEntry.getKey().getAppId();
+      if (!hostedAppCounter.containsKey(appId)) {
+        hostedAppCounter.put(appId, new MutableInt(numHosts));
+      } else {
+        int currentHostCount = hostedAppCounter.get(appId).intValue();
+        if (currentHostCount < numHosts) {
+          hostedAppCounter.put(appId, new MutableInt(numHosts));
+        }
+      }
+    }
+
+    // Add liveHosts per AppId metrics.
+    processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]);
+
+    return metricsFromCache;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
new file mode 100644
index 0000000..371d9fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+
+public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class);
+  private TimelineMetricMetadataManager metricMetadataManager;
+  private ConcurrentHashMap<String, Long> postedAggregatedMap;
+
+  public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+                                               TimelineMetricMetadataManager metricMetadataManager,
+                                               PhoenixHBaseAccessor hBaseAccessor,
+                                               Configuration metricsConf,
+                                               String checkpointLocation,
+                                               Long sleepIntervalMillis,
+                                               Integer checkpointCutOffMultiplier,
+                                               String hostAggregatorDisabledParam,
+                                               String tableName,
+                                               String outputTableName,
+                                               Long nativeTimeRangeDelay,
+                                               MetricCollectorHAController haController,
+                                               ConcurrentHashMap<String, Long> postedAggregatedMap) {
+    super(aggregatorName, metricMetadataManager,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      tableName,
+      outputTableName,
+      nativeTimeRangeDelay,
+      haController);
+    this.metricMetadataManager = metricMetadataManager;
+    this.postedAggregatedMap = postedAggregatedMap;
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    List<String> aggregatedHostnames = new ArrayList<>();
+    for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
+      if (entry.getValue() > startTime && entry.getValue() <= endTime) {
+        aggregatedHostnames.add(entry.getKey());
+      }
+    }
+    List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames);
+      LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames);
+    }
+    List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", "");
+
+    Condition condition = new DefaultCondition(uuids, null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+    // Retaining order of the row-key avoids client side merge sort.
+    condition.addOrderByColumn("UUID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..c25d6ce
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
+  TimelineMetricReadHelper readHelper;
+
+  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
+                                      TimelineMetricMetadataManager metricMetadataManager,
+                                      PhoenixHBaseAccessor hBaseAccessor,
+                                      Configuration metricsConf,
+                                      String checkpointLocation,
+                                      Long sleepIntervalMillis,
+                                      Integer checkpointCutOffMultiplier,
+                                      String hostAggregatorDisabledParam,
+                                      String tableName,
+                                      String outputTableName,
+                                      Long nativeTimeRangeDelay,
+                                      MetricCollectorHAController haController) {
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
+    readHelper = new TimelineMetricReadHelper(metricMetadataManager, false);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+    // Retaining order of the row-key avoids client side merge sort.
+    condition.addOrderByColumn("UUID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
+      throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =  new HashMap<TimelineMetric, MetricHostAggregate>();
+
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        currentMetric.setStartTime(endTime);
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        currentMetric.setStartTime(endTime);
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
new file mode 100644
index 0000000..a56c7aa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+public class TimelineMetricReadHelper {
+
+  private boolean ignoreInstance = false;
+  private TimelineMetricMetadataManager metadataManagerInstance = null;
+
+  public TimelineMetricReadHelper() {}
+
+  public TimelineMetricReadHelper(boolean ignoreInstance) {
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) {
+    this.metadataManagerInstance = timelineMetricMetadataManager;
+  }
+
+  public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) {
+    this.metadataManagerInstance = timelineMetricMetadataManager;
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+      throws SQLException, IOException {
+    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
+    metric.setMetricValues(sortedByTimeMetrics);
+    return metric;
+  }
+
+  public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs,
+      Function f) throws SQLException, IOException {
+
+    byte[] uuid = rs.getBytes("UUID");
+    TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+    Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION;
+    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+      timelineMetric.getMetricName() + function.getSuffix(),
+      timelineMetric.getAppId(),
+      timelineMetric.getInstanceId(),
+      timelineMetric.getHostName(),
+      rs.getLong("SERVER_TIME")
+    );
+
+    double value;
+    switch(function.getReadFunction()){
+      case AVG:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+      case MIN:
+        value = rs.getDouble("METRIC_MIN");
+        break;
+      case MAX:
+        value = rs.getDouble("METRIC_MAX");
+        break;
+      case SUM:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+      default:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        break;
+    }
+
+    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
+
+    return metric;
+  }
+
+  /**
+   * Returns common part of timeline metrics record without the values.
+   */
+  public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
+      throws SQLException {
+
+    byte[] uuid = rs.getBytes("UUID");
+    TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+    if (ignoreInstance) {
+      metric.setInstanceId(null);
+    }
+    metric.setStartTime(rs.getLong("SERVER_TIME"));
+    return metric;
+  }
+
+  public MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+  public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("METRIC_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+  public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException {
+
+    byte[] uuid = rs.getBytes("UUID");
+    TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+
+    return new TimelineClusterMetric(
+      timelineMetric.getMetricName(),
+      timelineMetric.getAppId(),
+      ignoreInstance ? null : timelineMetric.getInstanceId(),
+      rs.getLong("SERVER_TIME"));
+  }
+
+  public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+    metricHostAggregate.setDeviation(0.0);
+    return metricHostAggregate;
+  }
+
+  public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+      throws SQLException, IOException {
+    byte[] uuid = rs.getBytes("UUID");
+    return metadataManagerInstance.getMetricFromUuid(uuid);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
new file mode 100644
index 0000000..e28d465
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.metrics.core.timeline.query.TopNCondition;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+
+public class TopNDownSampler implements CustomDownSampler {
+
+  private TopNConfig topNConfig;
+  private static final Log LOG = LogFactory.getLog(TopNDownSampler.class);
+  protected String metricPatterns;
+
+  public static TopNDownSampler fromConfig(Map<String, String> conf) {
+    String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn." +
+      DownSamplerUtils.downSamplerMetricPatternsConfig);
+
+    String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.value");
+    Integer topNValue = topNString != null ? Integer.valueOf(topNString) : 10;
+    String topNFunction = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.function");
+
+    return new TopNDownSampler(new TopNConfig(topNValue, topNFunction, false), metricPatterns);
+  }
+
+  public TopNDownSampler(TopNConfig topNConfig, String metricPatterns) {
+    this.topNConfig = topNConfig;
+    this.metricPatterns = metricPatterns;
+  }
+
+  @Override
+  public boolean validateConfigs() {
+    if (topNConfig == null) {
+      return false;
+    }
+
+    if (topNConfig.getTopN() <= 0) {
+      return false;
+    }
+
+    if (StringUtils.isEmpty(metricPatterns)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Prepare downsampling SELECT statement(s) used to determine the data to be written into the Aggregate table.
+   * @param startTime
+   * @param endTime
+   * @param tableName
+   * @return
+   */
+  @Override
+  public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName) {
+    List<String> stmts = new ArrayList<>();
+
+    Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+    Function function = new Function(readFunction, null);
+    String columnSelect = TopNCondition.getColumnSelect(function);
+
+    List<String> metricPatternList = Arrays.asList(metricPatterns.split(","));
+
+    for (String metricPattern : metricPatternList) {
+      String metricPatternClause = "'" + metricPattern + "'";
+      //TODO : Need a better way to find out what kind of aggregation the current one is.
+      if (tableName.contains("RECORD")) {
+        stmts.add(String.format(TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL,
+          endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause,
+          startTime, endTime, columnSelect, topNConfig.getTopN()));
+      } else {
+        stmts.add(String.format(TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL,
+          endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause,
+          startTime, endTime, columnSelect, topNConfig.getTopN()));
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DownSampling Stmt: " + stmts.toString());
+    }
+
+    return stmts;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..9c255e7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private final String aggregateColumnName;
+
+  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+                                         PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf,
+                                         String checkpointLocation,
+                                         Long sleepIntervalMillis,
+                                         Integer checkpointCutOffMultiplier,
+                                         String hostAggregatorDisabledParam,
+                                         String inputTableName,
+                                         String outputTableName,
+                                         Long nativeTimeRangeDelay,
+                                         MetricCollectorHAController haController) {
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam, inputTableName, outputTableName,
+      nativeTimeRangeDelay, haController);
+
+    if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+      aggregateColumnName = "HOSTS_COUNT";
+    } else {
+      aggregateColumnName = "METRIC_COUNT";
+    }
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    /*
+    UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
+    SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN)
+    SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS,
+    SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN)
+    FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND
+    SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS;
+     */
+
+    condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
+      outputTableName, endTime, aggregateColumnName, tableName,
+      getDownsampledMetricSkipClause(), startTime, endTime));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Condition: " + condition.toString());
+    }
+
+    return condition;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+    LOG.info("Aggregated cluster metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
new file mode 100644
index 0000000..1026cbe
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+
+public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
+  private TimelineMetricMetadataManager metricMetadataManager;
+  private ConcurrentHashMap<String, Long> postedAggregatedMap;
+
+  public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+                                               TimelineMetricMetadataManager metricMetadataManager,
+                                               PhoenixHBaseAccessor hBaseAccessor,
+                                               Configuration metricsConf,
+                                               String checkpointLocation,
+                                               Long sleepIntervalMillis,
+                                               Integer checkpointCutOffMultiplier,
+                                               String hostAggregatorDisabledParam,
+                                               String tableName,
+                                               String outputTableName,
+                                               Long nativeTimeRangeDelay,
+                                               MetricCollectorHAController haController,
+                                               ConcurrentHashMap<String, Long> postedAggregatedMap) {
+    super(aggregatorName,
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      tableName,
+      outputTableName,
+      nativeTimeRangeDelay,
+      haController);
+    this.metricMetadataManager = metricMetadataManager;
+    this.postedAggregatedMap = postedAggregatedMap;
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    List<String> aggregatedHostnames = new ArrayList<>();
+    for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
+      if (entry.getValue() > startTime && entry.getValue() <= endTime) {
+        aggregatedHostnames.add(entry.getKey());
+      }
+    }
+    List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames);
+      LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames);
+    }
+    List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", "");
+
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+      outputTableName, endTime, tableName,
+      getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Condition: " + condition.toString());
+    }
+
+    return condition;
+  }
+
+  private String getIncludedUuidsClause(List<byte[]> uuids) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(");
+
+    //LIKE clause
+    // (UUID LIKE ? OR UUID LIKE ?) AND
+    if (CollectionUtils.isNotEmpty(uuids)) {
+      for (int i = 0; i < uuids.size(); i++) {
+        sb.append("UUID");
+        sb.append(" LIKE ");
+        sb.append("'%");
+        sb.append(new String(uuids.get(i)));
+        sb.append("'");
+
+        if (i == uuids.size() - 1) {
+          sb.append(") AND ");
+        } else {
+          sb.append(" OR ");
+        }
+      }
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..9e8df6d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators.v2;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+
+  public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
+                                      PhoenixHBaseAccessor hBaseAccessor,
+                                      Configuration metricsConf,
+                                      String checkpointLocation,
+                                      Long sleepIntervalMillis,
+                                      Integer checkpointCutOffMultiplier,
+                                      String hostAggregatorDisabledParam,
+                                      String tableName,
+                                      String outputTableName,
+                                      Long nativeTimeRangeDelay,
+                                      MetricCollectorHAController haController) {
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay, haController);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+    LOG.info("Aggregated host metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+      outputTableName, endTime, tableName,
+      getDownsampledMetricSkipClause(), startTime, endTime));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Condition: " + condition.toString());
+    }
+
+    return condition;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
new file mode 100644
index 0000000..9a27d55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.StateMachineEngine;
+
+public class AggregationTaskRunner {
+  private final String instanceName;
+  private final String zkAddress;
+  private final String clusterName;
+  private HelixManager manager;
+  private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
+  private CheckpointManager checkpointManager;
+  // Map partition name to an aggregator dimension
+  static final Map<String, TimelineMetricAggregator.AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
+  // Ownership flags to be set by the State transitions
+  private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
+  private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
+
+  public enum AGGREGATOR_NAME {
+    METRIC_RECORD_MINUTE,
+    METRIC_RECORD_HOURLY,
+    METRIC_RECORD_DAILY,
+    METRIC_AGGREGATE_SECOND,
+    METRIC_AGGREGATE_MINUTE,
+    METRIC_AGGREGATE_HOURLY,
+    METRIC_AGGREGATE_DAILY,
+  }
+
+  public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
+
+  static {
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
+    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
+
+    // Partition name to task assignment
+    PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_0", TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER);
+    PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_1", TimelineMetricAggregator.AGGREGATOR_TYPE.HOST);
+  }
+
+  public AggregationTaskRunner(String instanceName, String zkAddress, String clusterName) {
+    this.instanceName = instanceName;
+    this.zkAddress = zkAddress;
+    this.clusterName = clusterName;
+  }
+
+  public void initialize() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+      InstanceType.PARTICIPANT, zkAddress);
+
+    OnlineOfflineStateModelFactory stateModelFactory =
+      new OnlineOfflineStateModelFactory(instanceName, this);
+
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory(MetricCollectorHAController.DEFAULT_STATE_MODEL, stateModelFactory);
+    manager.connect();
+
+    checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
+  }
+
+  public boolean performsClusterAggregation() {
+    return performsClusterAggregation.get();
+  }
+
+  public boolean performsHostAggregation() {
+    return performsHostAggregation.get();
+  }
+
+  public CheckpointManager getCheckpointManager() {
+    return checkpointManager;
+  }
+
+  public void setPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) {
+    switch (type) {
+      case HOST:
+        performsHostAggregation.set(true);
+        LOG.info("Set host aggregator function for : " + instanceName);
+        break;
+      case CLUSTER:
+        performsClusterAggregation.set(true);
+        LOG.info("Set cluster aggregator function for : " + instanceName);
+    }
+  }
+
+  public void unsetPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) {
+    switch (type) {
+      case HOST:
+        performsHostAggregation.set(false);
+        LOG.info("Unset host aggregator function for : " + instanceName);
+        break;
+      case CLUSTER:
+        performsClusterAggregation.set(false);
+        LOG.info("Unset cluster aggregator function for : " + instanceName);
+    }
+  }
+
+  /**
+   * Disconnect participant before controller shutdown
+   */
+  void stop() {
+    manager.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
new file mode 100644
index 0000000..868fb93
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+public class CheckpointManager {
+  private final ZkHelixPropertyStore<ZNRecord> propertyStore;
+  private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
+
+  static final String ZNODE_FIELD = "checkpoint";
+  static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
+
+  public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this.propertyStore = propertyStore;
+  }
+
+  /**
+   * Read aggregator checkpoint from zookeeper
+   *
+   * @return timestamp
+   */
+  public long readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) {
+    String path = getCheckpointZKPath(aggregatorName);
+    LOG.debug("Reading checkpoint at " + path);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Stat => " + stat);
+    }
+    long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
+    LOG.debug("Checkpoint value = " + checkpoint);
+    return checkpoint;
+  }
+
+  /**
+   * Write aggregator checkpoint in zookeeper
+   *
+   * @param value timestamp
+   * @return sucsess
+   */
+  public boolean writeCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, long value) {
+    String path = getCheckpointZKPath(aggregatorName);
+    LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
+    return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
+  }
+
+  static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
+    final String path;
+    final Long value;
+
+    public CheckpointDataUpdater(String path, Long value) {
+      this.path = path;
+      this.value = value;
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord currentData) {
+      if (currentData == null) {
+        currentData = new ZNRecord(path);
+      }
+      currentData.setLongField(ZNODE_FIELD, value);
+      return currentData;
+    }
+  }
+
+  String getCheckpointZKPath(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) {
+    StringBuilder sb = new StringBuilder("/");
+    sb.append(CHECKPOINT_PATH_PREFIX);
+    sb.append("/");
+    sb.append(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
new file mode 100644
index 0000000..ee28d87
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+import com.google.common.base.Joiner;
+
+;
+
+public class MetricCollectorHAController {
+  private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
+
+  static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
+  static final String INSTANCE_NAME_DELIMITER = "_";
+
+  final String zkConnectUrl;
+  final String instanceHostname;
+  final InstanceConfig instanceConfig;
+  final AggregationTaskRunner aggregationTaskRunner;
+  final TimelineMetricConfiguration configuration;
+
+  // Cache list of known live instances
+  final List<String> liveInstanceNames = new ArrayList<>();
+
+  // Helix Admin
+  HelixAdmin admin;
+  // Helix Manager
+  HelixManager manager;
+
+  private volatile boolean isInitialized = false;
+
+  public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
+    this.configuration = configuration;
+    String instancePort;
+    try {
+      instanceHostname = configuration.getInstanceHostnameFromEnv();
+      instancePort = configuration.getInstancePort();
+
+    } catch (Exception e) {
+      LOG.error("Error reading configs from classpath, will resort to defaults.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    try {
+      String zkClientPort = configuration.getClusterZKClientPort();
+      String zkQuorum = configuration.getClusterZKQuorum();
+
+      if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+          + zkClientPort +", quorum = " + zkQuorum);
+      }
+
+      zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
+
+    } catch (Exception e) {
+      LOG.error("Unable to load hbase-site from classpath.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
+    instanceConfig.setHostName(instanceHostname);
+    instanceConfig.setPort(instancePort);
+    instanceConfig.setInstanceEnabled(true);
+    aggregationTaskRunner = new AggregationTaskRunner(
+      instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
+  }
+
+  /**
+   * Name of Helix znode
+   */
+  public String getClusterName() {
+    return CLUSTER_NAME;
+  }
+
+  /**
+   * Initialize the instance with zookeeper via Helix
+   */
+  public void initializeHAController() throws Exception {
+    String clusterName = getClusterName();
+    admin = new ZKHelixAdmin(zkConnectUrl);
+    // create cluster
+    LOG.info("Creating zookeeper cluster node: " + clusterName);
+    boolean clusterAdded = admin.addCluster(clusterName, false);
+    LOG.info("Was cluster added successfully? " + clusterAdded);
+
+    // Adding host to the cluster
+    boolean success = false;
+    int tries = 5;
+    int sleepTimeInSeconds = 5;
+
+    for (int i = 0; i < tries && !success; i++) {
+      try {
+        List<String> nodes = admin.getInstancesInCluster(clusterName);
+        if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+          LOG.info("Adding participant instance " + instanceConfig);
+          admin.addInstance(clusterName, instanceConfig);
+        }
+        success = true;
+      } catch (HelixException | ZkNoNodeException ex) {
+        LOG.warn("Helix Cluster not yet setup fully.");
+        if (i < tries - 1) {
+          LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+          TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
+        } else {
+          LOG.error(ex);
+        }
+      }
+    }
+
+    if (!success) {
+      LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
+      admin.addCluster(clusterName, true);
+      List<String> nodes = admin.getInstancesInCluster(clusterName);
+      if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+        LOG.info("Adding participant instance " + instanceConfig);
+        admin.addInstance(clusterName, instanceConfig);
+      }
+    }
+
+    // Add a state model
+    if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
+      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForOnlineOffline()));
+    }
+
+    // Add resources with 1 cluster-wide replica
+    // Since our aggregators are unbalanced in terms of work distribution we
+    // only need to distribute writes to METRIC_AGGREGATE and
+    // METRIC_RECORD_MINUTE
+    List<String> resources = admin.getResourcesInCluster(clusterName);
+    if (!resources.contains(METRIC_AGGREGATORS)) {
+      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
+      admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
+    }
+    // this will set up the ideal state, it calculates the preference list for
+    // each partition similar to consistent hashing
+    admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
+
+    // Start participant
+    startAggregators();
+
+    // Start controller
+    startController();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        aggregationTaskRunner.stop();
+        manager.disconnect();
+      }
+    });
+
+    isInitialized = true;
+  }
+
+  /**
+   * Return true if HA controller is enabled.
+   */
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  private void startAggregators() {
+    try {
+      aggregationTaskRunner.initialize();
+
+    } catch (Exception e) {
+      LOG.error("Unable to start aggregators.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+  }
+
+  private void startController() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(
+      getClusterName(),
+      instanceHostname,
+      InstanceType.CONTROLLER,
+      zkConnectUrl
+    );
+
+    manager.connect();
+    HelixController controller = new HelixController();
+    manager.addLiveInstanceChangeListener(controller);
+  }
+
+  public AggregationTaskRunner getAggregationTaskRunner() {
+    return aggregationTaskRunner;
+  }
+
+  public List<String> getLiveInstanceHostNames() {
+    List<String> liveInstanceHostNames = new ArrayList<>();
+
+    for (String instance : liveInstanceNames) {
+      liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+    }
+
+    return liveInstanceHostNames;
+  }
+
+  public class HelixController extends GenericHelixController {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    Joiner joiner = Joiner.on(", ").skipNulls();
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+      super.onLiveInstanceChange(liveInstances, changeContext);
+
+      liveInstanceNames.clear();
+      for (LiveInstance instance : liveInstances) {
+        liveInstanceNames.add(instance.getInstanceName());
+      }
+
+      LOG.info("Detected change in liveliness of Collector instances. " +
+        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      // Print HA state - after some delay
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          printClusterState();
+        }
+      }, 30, TimeUnit.SECONDS);
+
+
+    }
+  }
+
+  public void printClusterState() {
+    StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
+
+    ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
+    if (resourceExternalView != null) {
+      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+    }
+    sb.append("\n##################################################");
+    LOG.info(sb.toString());
+  }
+
+  private void getPrintableResourceState(ExternalView resourceExternalView,
+                                         String resourceName,
+                                         StringBuilder sb) {
+    TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
+    sb.append("\nCLUSTER: ");
+    sb.append(getClusterName());
+    sb.append("\nRESOURCE: ");
+    sb.append(resourceName);
+    for (String partitionName : sortedSet) {
+      sb.append("\nPARTITION: ");
+      sb.append(partitionName).append("\t");
+      Map<String, String> states = resourceExternalView.getStateMap(partitionName);
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        sb.append("\t");
+        sb.append(stateEntry.getKey());
+        sb.append("\t");
+        sb.append(stateEntry.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..78a3199
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
+  private final String instanceName;
+  private final AggregationTaskRunner taskRunner;
+
+  public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
+    this.instanceName = instanceName;
+    this.taskRunner = taskRunner;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partition) {
+    LOG.info("Received request to process partition = " + partition + ", for " +
+            "resource = " + resourceName + ", at " + instanceName);
+    return new OnlineOfflineStateModel();
+  }
+
+  public class OnlineOfflineStateModel extends StateModel {
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Online from Offline for partition: " + partitionName);
+      TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.setPartitionAggregationFunction(type);
+    }
+
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Offline from Online for partition: " + partitionName);
+      TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.unsetPartitionAggregationFunction(type);
+    }
+
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+      String partitionName = message.getPartitionName();
+      LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
+      TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+      taskRunner.unsetPartitionAggregationFunction(type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
new file mode 100644
index 0000000..7645bd0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TimelineMetricHostMetadata {
+  //need concurrent data structure, only keys are used.
+  private ConcurrentHashMap<String, String> hostedApps = new ConcurrentHashMap<>();
+  private byte[] uuid;
+
+  // Default constructor
+  public TimelineMetricHostMetadata() {
+  }
+
+  public TimelineMetricHostMetadata(ConcurrentHashMap<String, String> hostedApps) {
+    this.hostedApps = hostedApps;
+  }
+
+  public TimelineMetricHostMetadata(Set<String> hostedApps) {
+    ConcurrentHashMap<String, String> appIdsMap = new ConcurrentHashMap<>();
+    for (String appId : hostedApps) {
+      appIdsMap.put(appId, appId);
+    }
+    this.hostedApps = appIdsMap;
+  }
+
+  public ConcurrentHashMap<String, String> getHostedApps() {
+    return hostedApps;
+  }
+
+  public void setHostedApps(ConcurrentHashMap<String, String> hostedApps) {
+    this.hostedApps = hostedApps;
+  }
+
+  public byte[] getUuid() {
+    return uuid;
+  }
+
+  public void setUuid(byte[] uuid) {
+    this.uuid = uuid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
new file mode 100644
index 0000000..d308ce1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.lang3.StringUtils;
+
+@XmlRootElement
+public class TimelineMetricMetadataKey {
+  String metricName;
+  String appId;
+  String instanceId;
+
+  public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o;
+
+    if (!metricName.equals(that.metricName)) return false;
+    if (!appId.equals(that.appId)) return false;
+    return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineMetricMetadataKey{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      '}';
+  }
+}