You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/01 21:03:34 UTC

[06/22] ambari git commit: AMBARI-5707. Renaming a module. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..0d53f5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+  static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+  // TODO: Configurable TTL values
+  /**
+   * Create table to store individual metric records.
+   */
+  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+    "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+    "HOSTNAME VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "START_TIME UNSIGNED_LONG, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE, " +
+    "METRICS VARCHAR CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+    "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+      " COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "HOSTS_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  /**
+   * Insert into metric records table.
+   */
+  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+    "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+    " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+    "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN," +
+    "METRIC_COUNT) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Retrieve a set of rows from metrics records table.
+   */
+  public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS " +
+    "FROM %s";
+
+  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+    "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+    "METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM METRIC_AGGREGATE";
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_RECORD_MINUTE";
+  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_RECORD_HOURLY";
+  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+    "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_AGGREGATE_HOURLY";
+  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+  public static final String DEFAULT_ENCODING = "FAST_DIFF";
+  public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+
+  /** Filter to optimize HBase scan by using file timestamps. This prevents
+   * a full table scan of metric records.
+   * @return Phoenix Hint String
+   */
+  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+  }
+
+  public static PreparedStatement prepareGetMetricsSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_METRIC_SQL,
+        getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+        METRICS_RECORD_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause();
+
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+    }
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      stmt.setString(pos++, condition.getHostname());
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      // TODO: fix case of appId coming from host metrics
+      String appId = condition.getAppId();
+      if (!condition.getAppId().equals("HOST")) {
+        appId = appId.toLowerCase();
+      }
+      LOG.debug("Setting pos: " + pos + ", value: " + appId);
+      stmt.setString(pos++, appId);
+    }
+    if (condition.getInstanceId() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      stmt.setLong(pos, condition.getEndTime());
+    }
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
+
+  public static PreparedStatement prepareGetAggregateSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+
+    StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      stmt.setString(pos++, condition.getAppId().toLowerCase());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      stmt.setLong(pos, condition.getEndTime());
+    }
+
+    return stmt;
+  }
+
+  static class Condition {
+    List<String> metricNames;
+    String hostname;
+    String appId;
+    String instanceId;
+    Long startTime;
+    Long endTime;
+    Integer limit;
+    boolean grouped;
+    boolean noLimit = false;
+    Integer fetchSize;
+    String statement;
+    Set<String> orderByColumns = new LinkedHashSet<String>();
+
+    Condition(List<String> metricNames, String hostname, String appId,
+              String instanceId, Long startTime, Long endTime, Integer limit,
+              boolean grouped) {
+      this.metricNames = metricNames;
+      this.hostname = hostname;
+      this.appId = appId;
+      this.instanceId = instanceId;
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.limit = limit;
+      this.grouped = grouped;
+    }
+
+    String getStatement() {
+      return statement;
+    }
+
+    void setStatement(String statement) {
+      this.statement = statement;
+    }
+
+    List<String> getMetricNames() {
+      return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+    }
+
+    String getMetricsClause() {
+      StringBuilder sb = new StringBuilder("(");
+      if (metricNames != null) {
+        for (String name : metricNames) {
+          if (sb.length() != 1) {
+            sb.append(", ");
+          }
+          sb.append("?");
+        }
+        sb.append(")");
+        return sb.toString();
+      } else {
+        return null;
+      }
+    }
+
+    String getConditionClause() {
+      StringBuilder sb = new StringBuilder();
+      boolean appendConjunction = false;
+
+      if (getMetricNames() != null) {
+        sb.append("METRIC_NAME IN ");
+        sb.append(getMetricsClause());
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getHostname() != null) {
+        sb.append(" HOSTNAME = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getAppId() != null) {
+        sb.append(" APP_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getInstanceId() != null) {
+        sb.append(" INSTANCE_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getStartTime() != null) {
+        sb.append(" SERVER_TIME >= ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      if (getEndTime() != null) {
+        sb.append(" SERVER_TIME < ?");
+      }
+      return sb.toString();
+    }
+
+    String getHostname() {
+      return hostname == null || hostname.isEmpty() ? null : hostname;
+    }
+
+    String getAppId() {
+      return appId == null || appId.isEmpty() ? null : appId;
+    }
+
+    String getInstanceId() {
+      return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+    }
+
+    /**
+     * Convert to millis.
+     */
+    Long getStartTime() {
+      if (startTime < 9999999999l) {
+        return startTime * 1000;
+      } else {
+        return startTime;
+      }
+    }
+
+    Long getEndTime() {
+      if (endTime < 9999999999l) {
+        return endTime * 1000;
+      } else {
+        return endTime;
+      }
+    }
+
+    void setNoLimit() {
+      this.noLimit = true;
+    }
+
+    Integer getLimit() {
+      if (noLimit) {
+        return null;
+      }
+      return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+    }
+
+    boolean isGrouped() {
+      return grouped;
+    }
+
+    boolean isEmpty() {
+      return (metricNames == null || metricNames.isEmpty())
+        && (hostname == null || hostname.isEmpty())
+        && (appId == null || appId.isEmpty())
+        && (instanceId == null || instanceId.isEmpty())
+        && startTime == null
+        && endTime == null;
+    }
+
+    Integer getFetchSize() {
+      return fetchSize;
+    }
+
+    void setFetchSize(Integer fetchSize) {
+      this.fetchSize = fetchSize;
+    }
+
+    void addOrderByColumn(String column) {
+      orderByColumns.add(column);
+    }
+
+    String getOrderByClause() {
+      String orderByStr = " ORDER BY ";
+      if (!orderByColumns.isEmpty()) {
+        StringBuilder sb = new StringBuilder(orderByStr);
+        for (String orderByColumn : orderByColumns) {
+          if (sb.length() != orderByStr.length()) {
+            sb.append(", ");
+          }
+          sb.append(orderByColumn);
+        }
+        sb.append(" ");
+        return sb.toString();
+      }
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "Condition{" +
+        "metricNames=" + metricNames +
+        ", hostname='" + hostname + '\'' +
+        ", appId='" + appId + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", startTime=" + startTime +
+        ", endTime=" + endTime +
+        ", limit=" + limit +
+        ", grouped=" + grouped +
+        ", orderBy=" + orderByColumns +
+        ", noLimit=" + noLimit +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+  private String type;
+
+  TimelineClusterMetric(String metricName, String appId, String instanceId,
+                        long timestamp, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+    this.type = type;
+  }
+
+  String getMetricName() {
+    return metricName;
+  }
+
+  String getAppId() {
+    return appId;
+  }
+
+  String getInstanceId() {
+    return instanceId;
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+
+  String getType() { return type; }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..cab154b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricAggregator.class);
+
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private final String hostAggregatorDisabledParam;
+  private final String tableName;
+  private final String outputTableName;
+  private final Long nativeTimeRangeDelay;
+
+  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                  Configuration metricsConf,
+                                  String checkpointLocation,
+                                  Long sleepIntervalMillis,
+                                  Integer checkpointCutOffMultiplier,
+                                  String hostAggregatorDisabledParam,
+                                  String tableName,
+                                  String outputTableName,
+                                  Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+    this.tableName = tableName;
+    this.outputTableName = outputTableName;
+    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException {
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+      outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      tableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+      (ResultSet rs) throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..8b10079
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l);
+  }
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..654c188
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-checkpoint";
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  public final int timeSliceIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+
+  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf) {
+    super(hBaseAccessor, metricsConf);
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+    timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+    checkpointCutOffMultiplier =
+      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    Map<TimelineClusterMetric, MetricClusterAggregate>
+      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_RECORD_TABLE_NAME));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate>
+  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+    throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    // Create time slices
+
+    while (rs.next()) {
+      TimelineMetric metric =
+        PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+      Map<TimelineClusterMetric, Double> clusterMetrics =
+        sliceFromTimelineMetric(metric, timeSlices);
+
+      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+            clusterMetrics.entrySet()) {
+          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+          Double avgValue = clusterMetricEntry.getValue();
+
+          if (aggregate == null) {
+            aggregate = new MetricClusterAggregate(avgValue, 1, null,
+              avgValue, avgValue);
+            aggregateClusterMetrics.put(clusterMetric, aggregate);
+          } else {
+            aggregate.updateSum(avgValue);
+            aggregate.updateNumberOfHosts(1);
+            aggregate.updateMax(avgValue);
+            aggregate.updateMin(avgValue);
+          }
+        }
+      }
+    }
+    return aggregateClusterMetrics;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
+  }
+
+  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+      new HashMap<TimelineClusterMetric, Double>();
+
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      // TODO: investigate null values - pre filter
+      if (metric.getValue() == null) {
+        continue;
+      }
+      Long timestamp = getSliceTimeForMetric(timeSlices,
+                       Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+          timelineMetric.getMetricName(),
+          timelineMetric.getAppId(),
+          timelineMetric.getInstanceId(),
+          timestamp,
+          timelineMetric.getType());
+        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+        } else {
+          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+          Double newValue = (oldValue + metric.getValue()) / 2;
+          timelineClusterMetricMap.put(clusterMetric, newValue);
+        }
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  /**
+   * Return beginning of the time slice into which the metric fits.
+   */
+  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[0];
+      }
+    }
+    return -1l;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..7764ea3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricClusterAggregatorHourly extends
+  AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricClusterAggregatorHourly.class);
+  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+  private final String checkpointLocation;
+  private final long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private long checkpointCutOffIntervalMillis;
+  private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
+
+  public TimelineMetricClusterAggregatorHourly(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+    super(hBaseAccessor, metricsConf);
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
+    checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+      Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+        aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineClusterMetric, MetricHostAggregate>
+  aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
+
+    TimelineClusterMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric =
+        getTimelineMetricClusterKeyFromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        getMetricClusterAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+
+      } else {
+        // Switched over to a new metric - save existing
+        hostAggregate = new MetricHostAggregate();
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+
+    }
+
+    return hostAggregateMap;
+  }
+
+  private void updateAggregatesFromHost(
+    MetricHostAggregate agg,
+    MetricClusterAggregate currentClusterAggregate) {
+    agg.updateMax(currentClusterAggregate.getMax());
+    agg.updateMin(currentClusterAggregate.getMin());
+    agg.updateSum(currentClusterAggregate.getSum());
+    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return checkpointCutOffIntervalMillis;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
new file mode 100644
index 0000000..60833d0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Configuration class that reads properties from ams-site.xml. All values
+ * for time or intervals are given in seconds.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TimelineMetricConfiguration {
+  public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
+  public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+
+  public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
+    "timeline.metrics.aggregator.checkpoint.dir";
+
+  public static final String DEFAULT_CHECKPOINT_LOCATION =
+    System.getProperty("java.io.tmpdir");
+
+  public static final String HBASE_ENCODING_SCHEME =
+    "timeline.metrics.hbase.data.block.encoding";
+
+  public static final String HBASE_COMPRESSION_SCHEME =
+    "timeline.metrics.hbase.compression.scheme";
+
+  public static final String PRECISION_TABLE_TTL =
+    "timeline.metrics.host.aggregator.ttl";
+  public static final String HOST_MINUTE_TABLE_TTL =
+    "timeline.metrics.host.aggregator.minute.ttl";
+  public static final String HOST_HOUR_TABLE_TTL =
+    "timeline.metrics.host.aggregator.hourly.ttl";
+  public static final String CLUSTER_MINUTE_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.minute.ttl";
+  public static final String CLUSTER_HOUR_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.hourly.ttl";
+
+  public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
+    "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
+
+  public static final String AGGREGATOR_CHECKPOINT_DELAY =
+    "timeline.metrics.service.checkpointDelay";
+
+  public static final String RESULTSET_FETCH_SIZE =
+    "timeline.metrics.service.resultset.fetchSize";
+
+  public static final String HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.minute.interval";
+
+  public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.hourly.interval";
+
+  public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.minute.interval";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.hourly.interval";
+
+  public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier";
+
+  public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier";
+
+  public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
+  public static final String GLOBAL_RESULT_LIMIT =
+    "timeline.metrics.service.default.result.limit";
+
+  public static final String GLOBAL_MAX_RETRIES =
+    "timeline.metrics.service.default.max_retries";
+
+  public static final String GLOBAL_RETRY_INTERVAL =
+    "timeline.metrics.service.default.retryInterval";
+
+  public static final String HOST_AGGREGATOR_MINUTE_DISABLED =
+    "timeline.metrics.host.aggregator.minute.disabled";
+
+  public static final String HOST_AGGREGATOR_HOUR_DISABLED =
+    "timeline.metrics.host.aggregator.hourly.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
+    "timeline.metrics.cluster.aggregator.minute.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED =
+    "timeline.metrics.cluster.aggregator.hourly.disabled";
+
+  public static final String DISABLE_APPLICATION_TIMELINE_STORE =
+    "timeline.service.disable.application.timeline.store";
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
new file mode 100644
index 0000000..5224450
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface TimelineMetricStore {
+  /**
+   * This method retrieves metrics stored byu the Timeline store.
+   *
+   * @param metricNames Names of the metric, e.g.: cpu_user
+   * @param hostname Name of the host where the metric originated from
+   * @param applicationId Id of the application to which this metric belongs
+   * @param instanceId Application instance id.
+   * @param startTime Start timestamp
+   * @param endTime End timestamp
+   * @param limit Override default result limit
+   * @param groupedByHosts Group {@link TimelineMetric} by metric name, hostname,
+   *                app id and instance id
+   *
+   * @return {@link TimelineMetric}
+   * @throws java.sql.SQLException
+   */
+  TimelineMetrics getTimelineMetrics(List<String> metricNames, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit, boolean groupedByHosts)
+    throws SQLException, IOException;
+
+
+  /**
+   * Return all records for a single metric satisfying the filter criteria.
+   * @return {@link TimelineMetric}
+   */
+  TimelineMetric getTimelineMetric(String metricName, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit)
+      throws SQLException, IOException;
+
+
+  /**
+   * Stores metric information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   *
+   * @param metrics An {@link TimelineMetrics}.
+   * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}.
+   * @throws SQLException, IOException
+   */
+  TimelinePutResponse putMetrics(TimelineMetrics metrics)
+    throws SQLException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
new file mode 100644
index 0000000..7ba51af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptFinishData {
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptFinishData newInstance(
+      ApplicationAttemptId appAttemptId, String diagnosticsInfo,
+      String trackingURL, FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    ApplicationAttemptFinishData appAttemptFD =
+        Records.newRecord(ApplicationAttemptFinishData.class);
+    appAttemptFD.setApplicationAttemptId(appAttemptId);
+    appAttemptFD.setDiagnosticsInfo(diagnosticsInfo);
+    appAttemptFD.setTrackingURL(trackingURL);
+    appAttemptFD.setFinalApplicationStatus(finalApplicationStatus);
+    appAttemptFD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+    return appAttemptFD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationAttemptId getApplicationAttemptId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationAttemptId(
+      ApplicationAttemptId applicationAttemptId);
+
+  @Public
+  @Unstable
+  public abstract String getTrackingURL();
+
+  @Public
+  @Unstable
+  public abstract void setTrackingURL(String trackingURL);
+
+  @Public
+  @Unstable
+  public abstract String getDiagnosticsInfo();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+  @Public
+  @Unstable
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus);
+
+  @Public
+  @Unstable
+  public abstract YarnApplicationAttemptState getYarnApplicationAttemptState();
+
+  @Public
+  @Unstable
+  public abstract void setYarnApplicationAttemptState(
+      YarnApplicationAttemptState yarnApplicationAttemptState);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
new file mode 100644
index 0000000..b759ab1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMAppAttempt</code>.
+ */
+@Public
+@Unstable
+public class ApplicationAttemptHistoryData {
+
+  private ApplicationAttemptId applicationAttemptId;
+
+  private String host;
+
+  private int rpcPort;
+
+  private String trackingURL;
+
+  private String diagnosticsInfo;
+
+  private FinalApplicationStatus finalApplicationStatus;
+
+  private ContainerId masterContainerId;
+
+  private YarnApplicationAttemptState yarnApplicationAttemptState;
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptHistoryData newInstance(
+      ApplicationAttemptId appAttemptId, String host, int rpcPort,
+      ContainerId masterContainerId, String diagnosticsInfo,
+      String trackingURL, FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    ApplicationAttemptHistoryData appAttemptHD =
+        new ApplicationAttemptHistoryData();
+    appAttemptHD.setApplicationAttemptId(appAttemptId);
+    appAttemptHD.setHost(host);
+    appAttemptHD.setRPCPort(rpcPort);
+    appAttemptHD.setMasterContainerId(masterContainerId);
+    appAttemptHD.setDiagnosticsInfo(diagnosticsInfo);
+    appAttemptHD.setTrackingURL(trackingURL);
+    appAttemptHD.setFinalApplicationStatus(finalApplicationStatus);
+    appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+    return appAttemptHD;
+  }
+
+  @Public
+  @Unstable
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Public
+  @Unstable
+  public void
+      setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
+    this.applicationAttemptId = applicationAttemptId;
+  }
+
+  @Public
+  @Unstable
+  public String getHost() {
+    return host;
+  }
+
+  @Public
+  @Unstable
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  @Public
+  @Unstable
+  public int getRPCPort() {
+    return rpcPort;
+  }
+
+  @Public
+  @Unstable
+  public void setRPCPort(int rpcPort) {
+    this.rpcPort = rpcPort;
+  }
+
+  @Public
+  @Unstable
+  public String getTrackingURL() {
+    return trackingURL;
+  }
+
+  @Public
+  @Unstable
+  public void setTrackingURL(String trackingURL) {
+    this.trackingURL = trackingURL;
+  }
+
+  @Public
+  @Unstable
+  public String getDiagnosticsInfo() {
+    return diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public void setDiagnosticsInfo(String diagnosticsInfo) {
+    this.diagnosticsInfo = diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus) {
+    this.finalApplicationStatus = finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public ContainerId getMasterContainerId() {
+    return masterContainerId;
+  }
+
+  @Public
+  @Unstable
+  public void setMasterContainerId(ContainerId masterContainerId) {
+    this.masterContainerId = masterContainerId;
+  }
+
+  @Public
+  @Unstable
+  public YarnApplicationAttemptState getYarnApplicationAttemptState() {
+    return yarnApplicationAttemptState;
+  }
+
+  @Public
+  @Unstable
+  public void setYarnApplicationAttemptState(
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    this.yarnApplicationAttemptState = yarnApplicationAttemptState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
new file mode 100644
index 0000000..7ca43fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> starts, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptStartData {
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptStartData newInstance(
+      ApplicationAttemptId appAttemptId, String host, int rpcPort,
+      ContainerId masterContainerId) {
+    ApplicationAttemptStartData appAttemptSD =
+        Records.newRecord(ApplicationAttemptStartData.class);
+    appAttemptSD.setApplicationAttemptId(appAttemptId);
+    appAttemptSD.setHost(host);
+    appAttemptSD.setRPCPort(rpcPort);
+    appAttemptSD.setMasterContainerId(masterContainerId);
+    return appAttemptSD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationAttemptId getApplicationAttemptId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationAttemptId(
+      ApplicationAttemptId applicationAttemptId);
+
+  @Public
+  @Unstable
+  public abstract String getHost();
+
+  @Public
+  @Unstable
+  public abstract void setHost(String host);
+
+  @Public
+  @Unstable
+  public abstract int getRPCPort();
+
+  @Public
+  @Unstable
+  public abstract void setRPCPort(int rpcPort);
+
+  @Public
+  @Unstable
+  public abstract ContainerId getMasterContainerId();
+
+  @Public
+  @Unstable
+  public abstract void setMasterContainerId(ContainerId masterContainerId);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
new file mode 100644
index 0000000..997fa6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when <code>RMApp</code>
+ * finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationFinishData {
+
+  @Public
+  @Unstable
+  public static ApplicationFinishData newInstance(ApplicationId applicationId,
+      long finishTime, String diagnosticsInfo,
+      FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationState yarnApplicationState) {
+    ApplicationFinishData appFD =
+        Records.newRecord(ApplicationFinishData.class);
+    appFD.setApplicationId(applicationId);
+    appFD.setFinishTime(finishTime);
+    appFD.setDiagnosticsInfo(diagnosticsInfo);
+    appFD.setFinalApplicationStatus(finalApplicationStatus);
+    appFD.setYarnApplicationState(yarnApplicationState);
+    return appFD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationId getApplicationId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationId(ApplicationId applicationId);
+
+  @Public
+  @Unstable
+  public abstract long getFinishTime();
+
+  @Public
+  @Unstable
+  public abstract void setFinishTime(long finishTime);
+
+  @Public
+  @Unstable
+  public abstract String getDiagnosticsInfo();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+  @Public
+  @Unstable
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus);
+
+  @Public
+  @Unstable
+  public abstract YarnApplicationState getYarnApplicationState();
+
+  @Public
+  @Unstable
+  public abstract void setYarnApplicationState(
+      YarnApplicationState yarnApplicationState);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
new file mode 100644
index 0000000..b7d16f3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMApp</code>.
+ */
+@Public
+@Unstable
+public class ApplicationHistoryData {
+
+  private ApplicationId applicationId;
+
+  private String applicationName;
+
+  private String applicationType;
+
+  private String user;
+
+  private String queue;
+
+  private long submitTime;
+
+  private long startTime;
+
+  private long finishTime;
+
+  private String diagnosticsInfo;
+
+  private FinalApplicationStatus finalApplicationStatus;
+
+  private YarnApplicationState yarnApplicationState;
+
+  @Public
+  @Unstable
+  public static ApplicationHistoryData newInstance(ApplicationId applicationId,
+      String applicationName, String applicationType, String queue,
+      String user, long submitTime, long startTime, long finishTime,
+      String diagnosticsInfo, FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationState yarnApplicationState) {
+    ApplicationHistoryData appHD = new ApplicationHistoryData();
+    appHD.setApplicationId(applicationId);
+    appHD.setApplicationName(applicationName);
+    appHD.setApplicationType(applicationType);
+    appHD.setQueue(queue);
+    appHD.setUser(user);
+    appHD.setSubmitTime(submitTime);
+    appHD.setStartTime(startTime);
+    appHD.setFinishTime(finishTime);
+    appHD.setDiagnosticsInfo(diagnosticsInfo);
+    appHD.setFinalApplicationStatus(finalApplicationStatus);
+    appHD.setYarnApplicationState(yarnApplicationState);
+    return appHD;
+  }
+
+  @Public
+  @Unstable
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  @Public
+  @Unstable
+  public void setApplicationId(ApplicationId applicationId) {
+    this.applicationId = applicationId;
+  }
+
+  @Public
+  @Unstable
+  public String getApplicationName() {
+    return applicationName;
+  }
+
+  @Public
+  @Unstable
+  public void setApplicationName(String applicationName) {
+    this.applicationName = applicationName;
+  }
+
+  @Public
+  @Unstable
+  public String getApplicationType() {
+    return applicationType;
+  }
+
+  @Public
+  @Unstable
+  public void setApplicationType(String applicationType) {
+    this.applicationType = applicationType;
+  }
+
+  @Public
+  @Unstable
+  public String getUser() {
+    return user;
+  }
+
+  @Public
+  @Unstable
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  @Public
+  @Unstable
+  public String getQueue() {
+    return queue;
+  }
+
+  @Public
+  @Unstable
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  @Public
+  @Unstable
+  public long getSubmitTime() {
+    return submitTime;
+  }
+
+  @Public
+  @Unstable
+  public void setSubmitTime(long submitTime) {
+    this.submitTime = submitTime;
+  }
+
+  @Public
+  @Unstable
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Public
+  @Unstable
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Public
+  @Unstable
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @Public
+  @Unstable
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  @Public
+  @Unstable
+  public String getDiagnosticsInfo() {
+    return diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public void setDiagnosticsInfo(String diagnosticsInfo) {
+    this.diagnosticsInfo = diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus) {
+    this.finalApplicationStatus = finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public YarnApplicationState getYarnApplicationState() {
+    return this.yarnApplicationState;
+  }
+
+  @Public
+  @Unstable
+  public void
+      setYarnApplicationState(YarnApplicationState yarnApplicationState) {
+    this.yarnApplicationState = yarnApplicationState;
+  }
+
+}