You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/04/03 00:00:50 UTC

[5/7] ambari git commit: AMBARI-10290. Expose avaialble host metrics across hostcomponents. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..a0e4e32
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricAggregatorFactory {
+  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l);
+  }
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
new file mode 100644
index 0000000..0c8ded2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+
+/**
+ * Aggregator responsible for providing app level host aggregates. This task
+ * is accomplished without doing a round trip to storage, rather
+ * TimelineMetricClusterAggregators are responsible for lifecycle of
+ * @TimelineMetricAppAggregator and provide the raw data to aggregate.
+ */
+public class TimelineMetricAppAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
+  // Lookup to check candidacy of an app
+  private final List<String> appIdsToAggregate;
+  // Map to lookup apps on a host
+  private Map<String, List<String>> hostedAppsMap = new HashMap<String, List<String>>();
+
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+
+  public TimelineMetricAppAggregator(Configuration metricsConf) {
+    appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
+    LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
+  }
+
+  /**
+   * Lifecycle method to initialize aggregation cycle.
+   */
+  public void init() {
+    LOG.debug("Initializing aggregation cycle.");
+    aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+  }
+
+  /**
+   * Lifecycle method to indicate end of aggregation cycle.
+   */
+  public void cleanup() {
+    LOG.debug("Cleanup aggregated data.");
+    aggregateClusterMetrics = null;
+  }
+
+  /**
+   * Useful for resetting apps that no-longer need aggregation without restart.
+   */
+  public void destroy() {
+    LOG.debug("Cleanup aggregated data as well as in-memory state.");
+    aggregateClusterMetrics = null;
+    hostedAppsMap = new HashMap<String, List<String>>();
+  }
+
+  /**
+   * Calculate aggregates if the clusterMetric is a Host metric for recorded
+   * apps that are housed by this host.
+   *
+   * @param clusterMetric @TimelineClusterMetric Host / App metric
+   * @param hostname This is the hostname from which this clusterMetric originated.
+   * @param metricValue The metric value for this metric.
+   */
+  public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
+                                           String hostname, Double metricValue) {
+
+    String appId = clusterMetric.getAppId();
+    if (appId == null) {
+      return; // No real use case except tests
+    }
+
+    // If metric is a host metric and host has apps on it
+    if (appId.equalsIgnoreCase(HOST_APP_ID)) {
+      // Candidate metric, update app aggregates
+      if (hostedAppsMap.containsKey(hostname)) {
+        updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
+      }
+    } else {
+      // Build the hostedapps map if not a host metric
+      // Check app candidacy for host aggregation
+      if (appIdsToAggregate.contains(appId)) {
+        List<String> appIds = hostedAppsMap.get(hostname);
+        if (appIds == null) {
+          appIds = new ArrayList<String>();
+          hostedAppsMap.put(hostname, appIds);
+        }
+        if (!appIds.contains(appId)) {
+          appIds.add(appId);
+          LOG.info("Adding appId to hosted apps: appId = " +
+            clusterMetric.getAppId() + ", hostname = " + hostname);
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a cluster app metric from a host metric
+   */
+  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric,
+                                                 String hostname, Double metricValue) {
+
+    if (aggregateClusterMetrics == null) {
+      LOG.error("Aggregation requested without init call.");
+      return;
+    }
+
+    List<String> apps = hostedAppsMap.get(hostname);
+    for (String appId : apps) {
+      // Add a new cluster aggregate metric if none exists
+      TimelineClusterMetric appTimelineClusterMetric =
+        new TimelineClusterMetric(clusterMetric.getMetricName(),
+          appId,
+          clusterMetric.getInstanceId(),
+          clusterMetric.getTimestamp(),
+          clusterMetric.getType()
+        );
+
+      MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
+
+      if (clusterAggregate == null) {
+        clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue);
+        aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate);
+      } else {
+        clusterAggregate.updateSum(metricValue);
+        clusterAggregate.updateNumberOfHosts(1);
+        clusterAggregate.updateMax(metricValue);
+        clusterAggregate.updateMin(metricValue);
+      }
+
+    }
+  }
+
+  /**
+   * Return current copy of aggregated data.
+   */
+  public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() {
+    return aggregateClusterMetrics;
+  }
+
+  private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
+    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+    if (!StringUtils.isEmpty(appIds)) {
+      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+    }
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..68b2ba9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-checkpoint";
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  public final int timeSliceIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+  // Aggregator to perform app-level aggregates for host metrics
+  private final TimelineMetricAppAggregator appAggregator;
+
+  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf) {
+    super(hBaseAccessor, metricsConf);
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+    timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+    checkpointCutOffMultiplier =
+      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    appAggregator = new TimelineMetricAppAggregator(metricsConf);
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    // Initialize app aggregates for host metrics
+    appAggregator.init();
+    Map<TimelineClusterMetric, MetricClusterAggregate>
+      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+    appAggregator.cleanup();
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_RECORD_TABLE_NAME));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    // Create time slices
+
+    while (rs.next()) {
+      TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+      Map<TimelineClusterMetric, Double> clusterMetrics =
+        sliceFromTimelineMetric(metric, timeSlices);
+
+      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+            clusterMetrics.entrySet()) {
+
+          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+          Double avgValue = clusterMetricEntry.getValue();
+
+          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+          if (aggregate == null) {
+            aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+            aggregateClusterMetrics.put(clusterMetric, aggregate);
+          } else {
+            aggregate.updateSum(avgValue);
+            aggregate.updateNumberOfHosts(1);
+            aggregate.updateMax(avgValue);
+            aggregate.updateMin(avgValue);
+          }
+          // Update app level aggregates
+          appAggregator.processTimelineClusterMetric(clusterMetric,
+            metric.getHostName(), avgValue);
+        }
+      }
+    }
+    // Add app level aggregates to save
+    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+    return aggregateClusterMetrics;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  public boolean isDisabled() {
+    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
+  }
+
+  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+      new HashMap<TimelineClusterMetric, Double>();
+
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      // TODO: investigate null values - pre filter
+      if (metric.getValue() == null) {
+        continue;
+      }
+      Long timestamp = getSliceTimeForMetric(timeSlices,
+                       Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+          timelineMetric.getMetricName(),
+          timelineMetric.getAppId(),
+          timelineMetric.getInstanceId(),
+          timestamp,
+          timelineMetric.getType());
+        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+        } else {
+          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+          Double newValue = (oldValue + metric.getValue()) / 2;
+          timelineClusterMetricMap.put(clusterMetric, newValue);
+        }
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  /**
+   * Return beginning of the time slice into which the metric fits.
+   */
+  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[0];
+      }
+    }
+    return -1l;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..264e4e6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricClusterAggregatorHourly.class);
+  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+  private final String checkpointLocation;
+  private final long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private long checkpointCutOffIntervalMillis;
+  private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
+  private final TimelineClusterMetricReader timelineClusterMetricReader
+     = new TimelineClusterMetricReader(true);
+
+  public TimelineMetricClusterAggregatorHourly(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    super(hBaseAccessor, metricsConf);
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
+    checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+      Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+        aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+        METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+      throws IOException, SQLException {
+
+    TimelineClusterMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric =
+        timelineClusterMetricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        getMetricClusterAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+
+      } else {
+        // Switched over to a new metric - save existing
+        hostAggregate = new MetricHostAggregate();
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+
+    }
+
+    return hostAggregateMap;
+  }
+
+  private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+    agg.updateMax(currentClusterAggregate.getMax());
+    agg.updateMin(currentClusterAggregate.getMin());
+    agg.updateSum(currentClusterAggregate.getSum());
+    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return checkpointCutOffIntervalMillis;
+  }
+
+  @Override
+  public boolean isDisabled() {
+    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
new file mode 100644
index 0000000..40a9648
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TimelineMetricReadHelper {
+
+  private boolean ignoreInstance = false;
+
+  public TimelineMetricReadHelper() {}
+
+  public TimelineMetricReadHelper(boolean ignoreInstance) {
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>(
+        PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")));
+    metric.setMetricValues(sortedByTimeMetrics);
+    return metric;
+  }
+
+  /**
+   * Returns common part of timeline metrics record without the values.
+   */
+  public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
+    throws SQLException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setStartTime(rs.getLong("START_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
new file mode 100644
index 0000000..b52748f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
@@ -0,0 +1,46 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface Condition {
+  boolean isEmpty();
+
+  List<String> getMetricNames();
+  boolean isPointInTime();
+  boolean isGrouped();
+  void setStatement(String statement);
+  String getHostname();
+  Precision getPrecision();
+  void setPrecision(Precision precision);
+  String getAppId();
+  String getInstanceId();
+  StringBuilder getConditionClause();
+  String getOrderByClause(boolean asc);
+  String getStatement();
+  Long getStartTime();
+  Long getEndTime();
+  Integer getLimit();
+  Integer getFetchSize();
+  void setFetchSize(Integer fetchSize);
+  void addOrderByColumn(String column);
+  void setNoLimit();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
new file mode 100644
index 0000000..24239a0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+  public Connection getConnection() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
new file mode 100644
index 0000000..9d6b7df
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class DefaultCondition implements Condition {
+  List<String> metricNames;
+  String hostname;
+  String appId;
+  String instanceId;
+  Long startTime;
+  Long endTime;
+  Precision precision;
+  Integer limit;
+  boolean grouped;
+  boolean noLimit = false;
+  Integer fetchSize;
+  String statement;
+  Set<String> orderByColumns = new LinkedHashSet<String>();
+
+  public DefaultCondition(List<String> metricNames, String hostname, String appId,
+                   String instanceId, Long startTime, Long endTime, Precision precision,
+                   Integer limit, boolean grouped) {
+    this.metricNames = metricNames;
+    this.hostname = hostname;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.precision = precision;
+    this.limit = limit;
+    this.grouped = grouped;
+  }
+
+  public String getStatement() {
+    return statement;
+  }
+
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  public List<String> getMetricNames() {
+    return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+  }
+
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+    StringBuilder metricsLike = new StringBuilder();
+    StringBuilder metricsIn = new StringBuilder();
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (name.contains("%")) {
+          if (metricsLike.length() > 1) {
+            metricsLike.append(" OR ");
+          }
+          metricsLike.append("METRIC_NAME LIKE ?");
+        } else {
+          if (metricsIn.length() > 0) {
+            metricsIn.append(", ");
+          }
+          metricsIn.append("?");
+        }
+      }
+
+      if (metricsIn.length()>0) {
+        sb.append("(METRIC_NAME IN (");
+        sb.append(metricsIn);
+        sb.append(")");
+        appendConjunction = true;
+      }
+
+      if (metricsLike.length() > 0) {
+        if (appendConjunction) {
+          sb.append(" OR ");
+        } else {
+          sb.append("(");
+        }
+        sb.append(metricsLike);
+        appendConjunction = true;
+      }
+
+      if (appendConjunction) {
+        sb.append(")");
+      }
+    }
+
+    appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
+    appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
+    appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
+    appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
+    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  protected static boolean append(StringBuilder sb,
+                                  boolean appendConjunction,
+                                  Object value, String str) {
+    if (value != null) {
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+
+      sb.append(str);
+      appendConjunction = true;
+    }
+    return appendConjunction;
+  }
+
+  public String getHostname() {
+    return hostname == null || hostname.isEmpty() ? null : hostname;
+  }
+
+  public Precision getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(Precision precision) {
+    this.precision = precision;
+  }
+
+  public String getAppId() {
+    if (appId != null && !appId.isEmpty()) {
+      if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
+        return appId.toLowerCase();
+      } else {
+        return appId;
+      }
+    }
+    return null;
+  }
+
+  public String getInstanceId() {
+    return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+  }
+
+  /**
+   * Convert to millis.
+   */
+  public Long getStartTime() {
+    if (startTime == null) {
+      return null;
+    } else if (startTime < 9999999999l) {
+      return startTime * 1000;
+    } else {
+      return startTime;
+    }
+  }
+
+  public Long getEndTime() {
+    if (endTime == null) {
+      return null;
+    }
+    if (endTime < 9999999999l) {
+      return endTime * 1000;
+    } else {
+      return endTime;
+    }
+  }
+
+  public void setNoLimit() {
+    this.noLimit = true;
+  }
+
+  public Integer getLimit() {
+    if (noLimit) {
+      return null;
+    }
+    return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+  }
+
+  public boolean isGrouped() {
+    return grouped;
+  }
+
+  public boolean isPointInTime() {
+    return getStartTime() == null && getEndTime() == null;
+  }
+
+  public boolean isEmpty() {
+    return (metricNames == null || metricNames.isEmpty())
+      && (hostname == null || hostname.isEmpty())
+      && (appId == null || appId.isEmpty())
+      && (instanceId == null || instanceId.isEmpty())
+      && startTime == null
+      && endTime == null;
+  }
+
+  public Integer getFetchSize() {
+    return fetchSize;
+  }
+
+  public void setFetchSize(Integer fetchSize) {
+    this.fetchSize = fetchSize;
+  }
+
+  public void addOrderByColumn(String column) {
+    orderByColumns.add(column);
+  }
+
+  public String getOrderByClause(boolean asc) {
+    String orderByStr = " ORDER BY ";
+    if (!orderByColumns.isEmpty()) {
+      StringBuilder sb = new StringBuilder(orderByStr);
+      for (String orderByColumn : orderByColumns) {
+        if (sb.length() != orderByStr.length()) {
+          sb.append(", ");
+        }
+        sb.append(orderByColumn);
+        if (!asc) {
+          sb.append(" DESC");
+        }
+      }
+      sb.append(" ");
+      return sb.toString();
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "Condition{" +
+      "metricNames=" + metricNames +
+      ", hostname='" + hostname + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", startTime=" + startTime +
+      ", endTime=" + endTime +
+      ", limit=" + limit +
+      ", grouped=" + grouped +
+      ", orderBy=" + orderByColumns +
+      ", noLimit=" + noLimit +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..562049b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+  private static final String ZOOKEEPER_CLIENT_PORT =
+    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+  private final String url;
+
+  public DefaultPhoenixDataSource(Configuration hbaseConf) {
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+      "2181");
+    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    url = String.format(connectionUrl,
+      zookeeperQuorum,
+      zookeeperClientPort,
+      znodeParent);
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+
+    LOG.debug("Metric store connection url: " + url);
+    try {
+      return DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
new file mode 100644
index 0000000..636999f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+  public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+
+  /**
+   * Create table to store individual metric records.
+   */
+  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+    "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+    "HOSTNAME VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "START_TIME UNSIGNED_LONG, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE, " +
+    "METRICS VARCHAR CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+    "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+      " COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "HOSTS_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  /**
+   * ALTER table to set new options
+   */
+  public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
+
+  /**
+   * Insert into metric records table.
+   */
+  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+    "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+    " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+    "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN," +
+    "METRIC_COUNT) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Retrieve a set of rows from metrics records table.
+   */
+  public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS " +
+    "FROM %s";
+
+  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+    "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+    "METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " +
+      "METRIC_NAME, APP_ID, " +
+      "INSTANCE_ID, SERVER_TIME, " +
+      "UNITS, " +
+      "METRIC_SUM, " +
+      "METRIC_COUNT, " +
+      "METRIC_MAX, " +
+      "METRIC_MIN " +
+      "FROM %s";
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_RECORD_MINUTE";
+  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_RECORD_HOURLY";
+  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+    "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_AGGREGATE_HOURLY";
+  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+  public static final String DEFAULT_ENCODING = "FAST_DIFF";
+  public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+  public static final long HOUR = 3600000; // 1 hour
+  public static final long DAY = 86400000; // 1 day
+
+  /**
+   * Filter to optimize HBase scan by using file timestamps. This prevents
+   * a full table scan of metric records.
+   *
+   * @return Phoenix Hint String
+   */
+  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+  }
+
+  public static PreparedStatement prepareGetMetricsSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+    validateRowCountLimit(condition);
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+
+      String metricsTable;
+      String query;
+      if (condition.getPrecision() == null) {
+        long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
+        long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
+        Long timeRange = endTime - startTime;
+        if (timeRange > 5 * DAY) {
+          metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          condition.setPrecision(Precision.HOURS);
+        } else if (timeRange > 10 * HOUR) {
+          metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          condition.setPrecision(Precision.MINUTES);
+        } else {
+          metricsTable = METRICS_RECORD_TABLE_NAME;
+          query = GET_METRIC_SQL;
+          condition.setPrecision(Precision.SECONDS);
+        }
+      } else {
+        switch (condition.getPrecision()) {
+          case HOURS:
+            metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+            query = GET_METRIC_AGGREGATE_ONLY_SQL;
+            break;
+          case MINUTES:
+            metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+            query = GET_METRIC_AGGREGATE_ONLY_SQL;
+            break;
+          default:
+            metricsTable = METRICS_RECORD_TABLE_NAME;
+            query = GET_METRIC_SQL;
+        }
+      }
+
+      stmtStr = String.format(query,
+        getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+        metricsTable);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause(true);
+
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+    }
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    }
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        }
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      }
+      stmt.setString(pos++, condition.getHostname());
+    }
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      }
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      }
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      }
+      stmt.setLong(pos, condition.getEndTime());
+    }
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
+  private static void validateConditionIsNotEmpty(Condition condition) {
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+  }
+
+  private static void validateRowCountLimit(Condition condition) {
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().size() ==0 ) {
+      //aggregator can use empty metrics query
+      return;
+    }
+
+    long range = condition.getEndTime() - condition.getStartTime();
+    long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
+
+    Precision precision = condition.getPrecision();
+    // for minutes and seconds we can use the rowsPerMetric computed based on
+    // minutes
+    if (precision != null && precision == Precision.HOURS) {
+      rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
+    }
+
+    long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size();
+    if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
+      throw new IllegalArgumentException("The time range query for " +
+        "precision table exceeds row count limit, please query aggregate " +
+        "table instead.");
+    }
+  }
+
+  public static PreparedStatement prepareGetLatestMetricSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().size() == 0) {
+      throw new IllegalArgumentException("Point in time query without " +
+        "metric names not supported ");
+    }
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_METRIC_SQL,
+        "",
+        METRICS_RECORD_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause(false);
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY METRIC_NAME DESC, HOSTNAME DESC, SERVER_TIME DESC ");
+    }
+
+    sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    }
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      //IGNORE condition limit, set one based on number of metric names
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        }
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      }
+      stmt.setString(pos++, condition.getHostname());
+    }
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      }
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
+  public static PreparedStatement prepareGetAggregateSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    String metricsAggregateTable;
+    String queryStmt;
+    if (condition.getPrecision() == null) {
+      long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
+      long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
+      Long timeRange = endTime - startTime;
+      if (timeRange > 5 * DAY) {
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+        condition.setPrecision(Precision.HOURS);
+      } else {
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_SQL;
+        condition.setPrecision(Precision.SECONDS);
+      }
+    } else {
+      switch (condition.getPrecision()) {
+        case HOURS:
+          metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+          queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+          break;
+        default:
+          metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+          queryStmt = GET_CLUSTER_AGGREGATE_SQL;
+      }
+    }
+
+    StringBuilder sb = new StringBuilder(queryStmt);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    String query = String.format(sb.toString(),
+      PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
+        NATIVE_TIME_RANGE_DELTA), metricsAggregateTable);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL => " + query + ", condition => " + condition);
+    }
+    PreparedStatement stmt = connection.prepareStatement(query);
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      stmt.setLong(pos, condition.getEndTime());
+    }
+
+    return stmt;
+  }
+
+  public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "",
+          METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause(false);
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC  ");
+    }
+
+    sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+    String query = sb.toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + query + ", condition: " + condition);
+    }
+
+    PreparedStatement stmt = connection.prepareStatement(query);
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+
+    return stmt;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
new file mode 100644
index 0000000..00d6a82
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SplitByMetricNamesCondition implements Condition {
+  private final Condition adaptee;
+  private String currentMetric;
+
+  public SplitByMetricNamesCondition(Condition condition){
+    this.adaptee = condition;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return adaptee.isEmpty();
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return Collections.singletonList(currentMetric);
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return adaptee.isPointInTime();
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return adaptee.isGrouped();
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    adaptee.setStatement(statement);
+  }
+
+  @Override
+  public String getHostname() {
+    return adaptee.getHostname();
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return adaptee.getPrecision();
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+    adaptee.setPrecision(precision);
+  }
+
+  @Override
+  public String getAppId() {
+    return adaptee.getAppId();
+  }
+
+  @Override
+  public String getInstanceId() {
+    return adaptee.getInstanceId();
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (sb.length() > 1) {
+          sb.append(" OR ");
+        }
+        sb.append("METRIC_NAME = ?");
+      }
+
+      appendConjunction = true;
+    }
+
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getHostname(), " HOSTNAME = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getAppId(), " APP_ID = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getInstanceId(), " INSTANCE_ID = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getStartTime(), " SERVER_TIME >= ?");
+    DefaultCondition.append(sb, appendConjunction, getEndTime(),
+      " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return adaptee.getOrderByClause(asc);
+  }
+
+  @Override
+  public String getStatement() {
+    return adaptee.getStatement();
+  }
+
+  @Override
+  public Long getStartTime() {
+    return adaptee.getStartTime();
+  }
+
+  @Override
+  public Long getEndTime() {
+    return adaptee.getEndTime();
+  }
+
+  @Override
+  public Integer getLimit() {
+    return adaptee.getLimit();
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return adaptee.getFetchSize();
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+    adaptee.setFetchSize(fetchSize);
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+    adaptee.addOrderByColumn(column);
+  }
+
+  @Override
+  public void setNoLimit() {
+    adaptee.setNoLimit();
+  }
+
+  public List<String> getOriginalMetricNames() {
+    return adaptee.getMetricNames();
+  }
+
+  public void setCurrentMetric(String currentMetric) {
+    this.currentMetric = currentMetric;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 3720852..e1d256d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -24,8 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.PhoenixHBaseAccessor;
 import org.apache.zookeeper.ClientCnxn;

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index b11a977..90c03e4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -37,7 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.assertj.core.api.Assertions.assertThat;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
index 969192d..c22e734 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +62,7 @@ public class AbstractTimelineAggregatorTest {
     agg = new AbstractTimelineAggregator(
       null, metricsConf, clock) {
       @Override
-      protected boolean doWork(long startTime, long endTime) {
+      public boolean doWork(long startTime, long endTime) {
         startTimeInDoWork.set(startTime);
         endTimeInDoWork.set(endTime);
         actualRuns++;
@@ -68,7 +71,7 @@ public class AbstractTimelineAggregatorTest {
       }
 
       @Override
-      protected PhoenixTransactSQL.Condition
+      protected Condition
       prepareMetricQueryCondition(long startTime, long endTime) {
         return null;
       }
@@ -89,7 +92,7 @@ public class AbstractTimelineAggregatorTest {
       }
 
       @Override
-      protected boolean isDisabled() {
+      public boolean isDisabled() {
         return false;
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
index 2a389ac..af9c6bb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.junit.Test;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.fromMetricName;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class FunctionTest {