You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/01 21:03:34 UTC
[06/22] ambari git commit: AMBARI-5707. Renaming a module. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..0d53f5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+ static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ // TODO: Configurable TTL values
+ /**
+ * Create table to store individual metric records.
+ */
+ public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+ "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "START_TIME UNSIGNED_LONG, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE, " +
+ "METRICS VARCHAR CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+ "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+ " COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "HOSTS_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ /**
+ * Insert into metric records table.
+ */
+ public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+ "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+ "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+ " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+ public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+ "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN," +
+ "METRIC_COUNT) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ /**
+ * Retrieve a set of rows from metrics records table.
+ */
+ public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+ "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS " +
+ "FROM %s";
+
+ public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+ "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT " +
+ "FROM %s";
+
+ public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+ "METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM METRIC_AGGREGATE";
+
+ public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+ public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+ "METRIC_RECORD_MINUTE";
+ public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_RECORD_HOURLY";
+ public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+ "METRIC_AGGREGATE";
+ public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_AGGREGATE_HOURLY";
+ public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+ public static final String DEFAULT_ENCODING = "FAST_DIFF";
+ public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+
+ /** Filter to optimize HBase scan by using file timestamps. This prevents
+ * a full table scan of metric records.
+ * @return Phoenix Hint String
+ */
+ public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+ return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+ }
+
+ public static PreparedStatement prepareGetMetricsSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+ String stmtStr;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ } else {
+ stmtStr = String.format(GET_METRIC_SQL,
+ getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+ METRICS_RECORD_TABLE_NAME);
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ String orderByClause = condition.getOrderByClause();
+
+ if (orderByClause != null) {
+ sb.append(orderByClause);
+ } else {
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+ }
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getHostname() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+ stmt.setString(pos++, condition.getHostname());
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ // TODO: fix case of appId coming from host metrics
+ String appId = condition.getAppId();
+ if (!condition.getAppId().equals("HOST")) {
+ appId = appId.toLowerCase();
+ }
+ LOG.debug("Setting pos: " + pos + ", value: " + appId);
+ stmt.setString(pos++, appId);
+ }
+ if (condition.getInstanceId() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+ stmt.setLong(pos, condition.getEndTime());
+ }
+ if (condition.getFetchSize() != null) {
+ stmt.setFetchSize(condition.getFetchSize());
+ }
+
+ return stmt;
+ }
+
+
+ public static PreparedStatement prepareGetAggregateSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+
+ StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ stmt.setString(pos++, condition.getAppId().toLowerCase());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ stmt.setLong(pos, condition.getEndTime());
+ }
+
+ return stmt;
+ }
+
+ static class Condition {
+ List<String> metricNames;
+ String hostname;
+ String appId;
+ String instanceId;
+ Long startTime;
+ Long endTime;
+ Integer limit;
+ boolean grouped;
+ boolean noLimit = false;
+ Integer fetchSize;
+ String statement;
+ Set<String> orderByColumns = new LinkedHashSet<String>();
+
+ Condition(List<String> metricNames, String hostname, String appId,
+ String instanceId, Long startTime, Long endTime, Integer limit,
+ boolean grouped) {
+ this.metricNames = metricNames;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.limit = limit;
+ this.grouped = grouped;
+ }
+
+ String getStatement() {
+ return statement;
+ }
+
+ void setStatement(String statement) {
+ this.statement = statement;
+ }
+
+ List<String> getMetricNames() {
+ return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+ }
+
+ String getMetricsClause() {
+ StringBuilder sb = new StringBuilder("(");
+ if (metricNames != null) {
+ for (String name : metricNames) {
+ if (sb.length() != 1) {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(")");
+ return sb.toString();
+ } else {
+ return null;
+ }
+ }
+
+ String getConditionClause() {
+ StringBuilder sb = new StringBuilder();
+ boolean appendConjunction = false;
+
+ if (getMetricNames() != null) {
+ sb.append("METRIC_NAME IN ");
+ sb.append(getMetricsClause());
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getHostname() != null) {
+ sb.append(" HOSTNAME = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getAppId() != null) {
+ sb.append(" APP_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getInstanceId() != null) {
+ sb.append(" INSTANCE_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getStartTime() != null) {
+ sb.append(" SERVER_TIME >= ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ if (getEndTime() != null) {
+ sb.append(" SERVER_TIME < ?");
+ }
+ return sb.toString();
+ }
+
+ String getHostname() {
+ return hostname == null || hostname.isEmpty() ? null : hostname;
+ }
+
+ String getAppId() {
+ return appId == null || appId.isEmpty() ? null : appId;
+ }
+
+ String getInstanceId() {
+ return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+ }
+
+ /**
+ * Convert to millis.
+ */
+ Long getStartTime() {
+ if (startTime < 9999999999l) {
+ return startTime * 1000;
+ } else {
+ return startTime;
+ }
+ }
+
+ Long getEndTime() {
+ if (endTime < 9999999999l) {
+ return endTime * 1000;
+ } else {
+ return endTime;
+ }
+ }
+
+ void setNoLimit() {
+ this.noLimit = true;
+ }
+
+ Integer getLimit() {
+ if (noLimit) {
+ return null;
+ }
+ return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+ }
+
+ boolean isGrouped() {
+ return grouped;
+ }
+
+ boolean isEmpty() {
+ return (metricNames == null || metricNames.isEmpty())
+ && (hostname == null || hostname.isEmpty())
+ && (appId == null || appId.isEmpty())
+ && (instanceId == null || instanceId.isEmpty())
+ && startTime == null
+ && endTime == null;
+ }
+
+ Integer getFetchSize() {
+ return fetchSize;
+ }
+
+ void setFetchSize(Integer fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ void addOrderByColumn(String column) {
+ orderByColumns.add(column);
+ }
+
+ String getOrderByClause() {
+ String orderByStr = " ORDER BY ";
+ if (!orderByColumns.isEmpty()) {
+ StringBuilder sb = new StringBuilder(orderByStr);
+ for (String orderByColumn : orderByColumns) {
+ if (sb.length() != orderByStr.length()) {
+ sb.append(", ");
+ }
+ sb.append(orderByColumn);
+ }
+ sb.append(" ");
+ return sb.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Condition{" +
+ "metricNames=" + metricNames +
+ ", hostname='" + hostname + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", limit=" + limit +
+ ", grouped=" + grouped +
+ ", orderBy=" + orderByColumns +
+ ", noLimit=" + noLimit +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+ private String type;
+
+ TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ this.type = type;
+ }
+
+ String getMetricName() {
+ return metricName;
+ }
+
+ String getAppId() {
+ return appId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ String getType() { return type; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..cab154b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricAggregator.class);
+
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private final String hostAggregatorDisabledParam;
+ private final String tableName;
+ private final String outputTableName;
+ private final Long nativeTimeRangeDelay;
+
+ public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay) {
+ super(hBaseAccessor, metricsConf);
+ this.checkpointLocation = checkpointLocation;
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+ this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+ this.tableName = tableName;
+ this.outputTableName = outputTableName;
+ this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException {
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ outputTableName);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+ tableName));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+ (ResultSet rs) throws IOException, SQLException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+ } else {
+ // Switched over to a new metric - save existing - create new aggregate
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..8b10079
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+ private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l);
+ }
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..654c188
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+ private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-checkpoint";
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ public final int timeSliceIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+
+ public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf) {
+ super(hBaseAccessor, metricsConf);
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+ sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+ timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
+ (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+ checkpointCutOffMultiplier =
+ metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+ Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_RECORD_TABLE_NAME));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ private Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ // Create time slices
+
+ while (rs.next()) {
+ TimelineMetric metric =
+ PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+ Map<TimelineClusterMetric, Double> clusterMetrics =
+ sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+ Double avgValue = clusterMetricEntry.getValue();
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null,
+ avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ }
+ }
+ }
+ return aggregateClusterMetrics;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
+ }
+
+ private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+ if (timelineMetric.getMetricValues().isEmpty()) {
+ return null;
+ }
+
+ Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+ new HashMap<TimelineClusterMetric, Double>();
+
+ for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+ // TODO: investigate null values - pre filter
+ if (metric.getValue() == null) {
+ continue;
+ }
+ Long timestamp = getSliceTimeForMetric(timeSlices,
+ Long.parseLong(metric.getKey().toString()));
+ if (timestamp != -1) {
+ // Metric is within desired time range
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timestamp,
+ timelineMetric.getType());
+ if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+ timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+ } else {
+ Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+ Double newValue = (oldValue + metric.getValue()) / 2;
+ timelineClusterMetricMap.put(clusterMetric, newValue);
+ }
+ }
+ }
+
+ return timelineClusterMetricMap;
+ }
+
+ /**
+ * Return beginning of the time slice into which the metric fits.
+ */
+ private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+ for (Long[] timeSlice : timeSlices) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+ return timeSlice[0];
+ }
+ }
+ return -1l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..7764ea3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricClusterAggregatorHourly extends
+ AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricClusterAggregatorHourly.class);
+ private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+ private final String checkpointLocation;
+ private final long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private long checkpointCutOffIntervalMillis;
+ private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
+
+ public TimelineMetricClusterAggregatorHourly(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ super(hBaseAccessor, metricsConf);
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+ sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+ checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
+ checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineClusterMetric, MetricHostAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
+
+ TimelineClusterMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ getMetricClusterAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+
+ }
+
+ return hostAggregateMap;
+ }
+
+ private void updateAggregatesFromHost(
+ MetricHostAggregate agg,
+ MetricClusterAggregate currentClusterAggregate) {
+ agg.updateMax(currentClusterAggregate.getMax());
+ agg.updateMin(currentClusterAggregate.getMin());
+ agg.updateSum(currentClusterAggregate.getSum());
+ agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffIntervalMillis() {
+ return checkpointCutOffIntervalMillis;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
new file mode 100644
index 0000000..60833d0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Configuration class that reads properties from ams-site.xml. All values
+ * for time or intervals are given in seconds.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TimelineMetricConfiguration {
+ public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
+ public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+
+ public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
+ "timeline.metrics.aggregator.checkpoint.dir";
+
+ public static final String DEFAULT_CHECKPOINT_LOCATION =
+ System.getProperty("java.io.tmpdir");
+
+ public static final String HBASE_ENCODING_SCHEME =
+ "timeline.metrics.hbase.data.block.encoding";
+
+ public static final String HBASE_COMPRESSION_SCHEME =
+ "timeline.metrics.hbase.compression.scheme";
+
+ public static final String PRECISION_TABLE_TTL =
+ "timeline.metrics.host.aggregator.ttl";
+ public static final String HOST_MINUTE_TABLE_TTL =
+ "timeline.metrics.host.aggregator.minute.ttl";
+ public static final String HOST_HOUR_TABLE_TTL =
+ "timeline.metrics.host.aggregator.hourly.ttl";
+ public static final String CLUSTER_MINUTE_TABLE_TTL =
+ "timeline.metrics.cluster.aggregator.minute.ttl";
+ public static final String CLUSTER_HOUR_TABLE_TTL =
+ "timeline.metrics.cluster.aggregator.hourly.ttl";
+
+ public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
+ "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
+
+ public static final String AGGREGATOR_CHECKPOINT_DELAY =
+ "timeline.metrics.service.checkpointDelay";
+
+ public static final String RESULTSET_FETCH_SIZE =
+ "timeline.metrics.service.resultset.fetchSize";
+
+ public static final String HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+ "timeline.metrics.host.aggregator.minute.interval";
+
+ public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+ "timeline.metrics.host.aggregator.hourly.interval";
+
+ public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+ "timeline.metrics.cluster.aggregator.minute.interval";
+
+ public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+ "timeline.metrics.cluster.aggregator.hourly.interval";
+
+ public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier";
+
+ public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier";
+
+ public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
+
+ public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
+
+ public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+ "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
+ public static final String GLOBAL_RESULT_LIMIT =
+ "timeline.metrics.service.default.result.limit";
+
+ public static final String GLOBAL_MAX_RETRIES =
+ "timeline.metrics.service.default.max_retries";
+
+ public static final String GLOBAL_RETRY_INTERVAL =
+ "timeline.metrics.service.default.retryInterval";
+
+ public static final String HOST_AGGREGATOR_MINUTE_DISABLED =
+ "timeline.metrics.host.aggregator.minute.disabled";
+
+ public static final String HOST_AGGREGATOR_HOUR_DISABLED =
+ "timeline.metrics.host.aggregator.hourly.disabled";
+
+ public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
+ "timeline.metrics.cluster.aggregator.minute.disabled";
+
+ public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED =
+ "timeline.metrics.cluster.aggregator.hourly.disabled";
+
+ public static final String DISABLE_APPLICATION_TIMELINE_STORE =
+ "timeline.service.disable.application.timeline.store";
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
new file mode 100644
index 0000000..5224450
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface TimelineMetricStore {
+ /**
+ * This method retrieves metrics stored byu the Timeline store.
+ *
+ * @param metricNames Names of the metric, e.g.: cpu_user
+ * @param hostname Name of the host where the metric originated from
+ * @param applicationId Id of the application to which this metric belongs
+ * @param instanceId Application instance id.
+ * @param startTime Start timestamp
+ * @param endTime End timestamp
+ * @param limit Override default result limit
+ * @param groupedByHosts Group {@link TimelineMetric} by metric name, hostname,
+ * app id and instance id
+ *
+ * @return {@link TimelineMetric}
+ * @throws java.sql.SQLException
+ */
+ TimelineMetrics getTimelineMetrics(List<String> metricNames, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit, boolean groupedByHosts)
+ throws SQLException, IOException;
+
+
+ /**
+ * Return all records for a single metric satisfying the filter criteria.
+ * @return {@link TimelineMetric}
+ */
+ TimelineMetric getTimelineMetric(String metricName, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit)
+ throws SQLException, IOException;
+
+
+ /**
+ * Stores metric information to the timeline store. Any errors occurring for
+ * individual put request objects will be reported in the response.
+ *
+ * @param metrics An {@link TimelineMetrics}.
+ * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}.
+ * @throws SQLException, IOException
+ */
+ TimelinePutResponse putMetrics(TimelineMetrics metrics)
+ throws SQLException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
new file mode 100644
index 0000000..7ba51af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptFinishData {
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptFinishData newInstance(
+ ApplicationAttemptId appAttemptId, String diagnosticsInfo,
+ String trackingURL, FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ ApplicationAttemptFinishData appAttemptFD =
+ Records.newRecord(ApplicationAttemptFinishData.class);
+ appAttemptFD.setApplicationAttemptId(appAttemptId);
+ appAttemptFD.setDiagnosticsInfo(diagnosticsInfo);
+ appAttemptFD.setTrackingURL(trackingURL);
+ appAttemptFD.setFinalApplicationStatus(finalApplicationStatus);
+ appAttemptFD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+ return appAttemptFD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationAttemptId(
+ ApplicationAttemptId applicationAttemptId);
+
+ @Public
+ @Unstable
+ public abstract String getTrackingURL();
+
+ @Public
+ @Unstable
+ public abstract void setTrackingURL(String trackingURL);
+
+ @Public
+ @Unstable
+ public abstract String getDiagnosticsInfo();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+ @Public
+ @Unstable
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus);
+
+ @Public
+ @Unstable
+ public abstract YarnApplicationAttemptState getYarnApplicationAttemptState();
+
+ @Public
+ @Unstable
+ public abstract void setYarnApplicationAttemptState(
+ YarnApplicationAttemptState yarnApplicationAttemptState);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
new file mode 100644
index 0000000..b759ab1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMAppAttempt</code>.
+ */
+@Public
+@Unstable
+public class ApplicationAttemptHistoryData {
+
+ private ApplicationAttemptId applicationAttemptId;
+
+ private String host;
+
+ private int rpcPort;
+
+ private String trackingURL;
+
+ private String diagnosticsInfo;
+
+ private FinalApplicationStatus finalApplicationStatus;
+
+ private ContainerId masterContainerId;
+
+ private YarnApplicationAttemptState yarnApplicationAttemptState;
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptHistoryData newInstance(
+ ApplicationAttemptId appAttemptId, String host, int rpcPort,
+ ContainerId masterContainerId, String diagnosticsInfo,
+ String trackingURL, FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ ApplicationAttemptHistoryData appAttemptHD =
+ new ApplicationAttemptHistoryData();
+ appAttemptHD.setApplicationAttemptId(appAttemptId);
+ appAttemptHD.setHost(host);
+ appAttemptHD.setRPCPort(rpcPort);
+ appAttemptHD.setMasterContainerId(masterContainerId);
+ appAttemptHD.setDiagnosticsInfo(diagnosticsInfo);
+ appAttemptHD.setTrackingURL(trackingURL);
+ appAttemptHD.setFinalApplicationStatus(finalApplicationStatus);
+ appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+ return appAttemptHD;
+ }
+
+ @Public
+ @Unstable
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ @Public
+ @Unstable
+ public void
+ setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ @Public
+ @Unstable
+ public String getHost() {
+ return host;
+ }
+
+ @Public
+ @Unstable
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ @Public
+ @Unstable
+ public int getRPCPort() {
+ return rpcPort;
+ }
+
+ @Public
+ @Unstable
+ public void setRPCPort(int rpcPort) {
+ this.rpcPort = rpcPort;
+ }
+
+ @Public
+ @Unstable
+ public String getTrackingURL() {
+ return trackingURL;
+ }
+
+ @Public
+ @Unstable
+ public void setTrackingURL(String trackingURL) {
+ this.trackingURL = trackingURL;
+ }
+
+ @Public
+ @Unstable
+ public String getDiagnosticsInfo() {
+ return diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public void setDiagnosticsInfo(String diagnosticsInfo) {
+ this.diagnosticsInfo = diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus) {
+ this.finalApplicationStatus = finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public ContainerId getMasterContainerId() {
+ return masterContainerId;
+ }
+
+ @Public
+ @Unstable
+ public void setMasterContainerId(ContainerId masterContainerId) {
+ this.masterContainerId = masterContainerId;
+ }
+
+ @Public
+ @Unstable
+ public YarnApplicationAttemptState getYarnApplicationAttemptState() {
+ return yarnApplicationAttemptState;
+ }
+
+ @Public
+ @Unstable
+ public void setYarnApplicationAttemptState(
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ this.yarnApplicationAttemptState = yarnApplicationAttemptState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
new file mode 100644
index 0000000..7ca43fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> starts, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptStartData {
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptStartData newInstance(
+ ApplicationAttemptId appAttemptId, String host, int rpcPort,
+ ContainerId masterContainerId) {
+ ApplicationAttemptStartData appAttemptSD =
+ Records.newRecord(ApplicationAttemptStartData.class);
+ appAttemptSD.setApplicationAttemptId(appAttemptId);
+ appAttemptSD.setHost(host);
+ appAttemptSD.setRPCPort(rpcPort);
+ appAttemptSD.setMasterContainerId(masterContainerId);
+ return appAttemptSD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationAttemptId(
+ ApplicationAttemptId applicationAttemptId);
+
+ @Public
+ @Unstable
+ public abstract String getHost();
+
+ @Public
+ @Unstable
+ public abstract void setHost(String host);
+
+ @Public
+ @Unstable
+ public abstract int getRPCPort();
+
+ @Public
+ @Unstable
+ public abstract void setRPCPort(int rpcPort);
+
+ @Public
+ @Unstable
+ public abstract ContainerId getMasterContainerId();
+
+ @Public
+ @Unstable
+ public abstract void setMasterContainerId(ContainerId masterContainerId);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
new file mode 100644
index 0000000..997fa6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when <code>RMApp</code>
+ * finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationFinishData {
+
+ @Public
+ @Unstable
+ public static ApplicationFinishData newInstance(ApplicationId applicationId,
+ long finishTime, String diagnosticsInfo,
+ FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationState yarnApplicationState) {
+ ApplicationFinishData appFD =
+ Records.newRecord(ApplicationFinishData.class);
+ appFD.setApplicationId(applicationId);
+ appFD.setFinishTime(finishTime);
+ appFD.setDiagnosticsInfo(diagnosticsInfo);
+ appFD.setFinalApplicationStatus(finalApplicationStatus);
+ appFD.setYarnApplicationState(yarnApplicationState);
+ return appFD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationId(ApplicationId applicationId);
+
+ @Public
+ @Unstable
+ public abstract long getFinishTime();
+
+ @Public
+ @Unstable
+ public abstract void setFinishTime(long finishTime);
+
+ @Public
+ @Unstable
+ public abstract String getDiagnosticsInfo();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+ @Public
+ @Unstable
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus);
+
+ @Public
+ @Unstable
+ public abstract YarnApplicationState getYarnApplicationState();
+
+ @Public
+ @Unstable
+ public abstract void setYarnApplicationState(
+ YarnApplicationState yarnApplicationState);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
new file mode 100644
index 0000000..b7d16f3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationHistoryData.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMApp</code>.
+ */
+@Public
+@Unstable
+public class ApplicationHistoryData {
+
+ private ApplicationId applicationId;
+
+ private String applicationName;
+
+ private String applicationType;
+
+ private String user;
+
+ private String queue;
+
+ private long submitTime;
+
+ private long startTime;
+
+ private long finishTime;
+
+ private String diagnosticsInfo;
+
+ private FinalApplicationStatus finalApplicationStatus;
+
+ private YarnApplicationState yarnApplicationState;
+
+ @Public
+ @Unstable
+ public static ApplicationHistoryData newInstance(ApplicationId applicationId,
+ String applicationName, String applicationType, String queue,
+ String user, long submitTime, long startTime, long finishTime,
+ String diagnosticsInfo, FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationState yarnApplicationState) {
+ ApplicationHistoryData appHD = new ApplicationHistoryData();
+ appHD.setApplicationId(applicationId);
+ appHD.setApplicationName(applicationName);
+ appHD.setApplicationType(applicationType);
+ appHD.setQueue(queue);
+ appHD.setUser(user);
+ appHD.setSubmitTime(submitTime);
+ appHD.setStartTime(startTime);
+ appHD.setFinishTime(finishTime);
+ appHD.setDiagnosticsInfo(diagnosticsInfo);
+ appHD.setFinalApplicationStatus(finalApplicationStatus);
+ appHD.setYarnApplicationState(yarnApplicationState);
+ return appHD;
+ }
+
+ @Public
+ @Unstable
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ @Public
+ @Unstable
+ public void setApplicationId(ApplicationId applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ @Public
+ @Unstable
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ @Public
+ @Unstable
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+
+ @Public
+ @Unstable
+ public String getApplicationType() {
+ return applicationType;
+ }
+
+ @Public
+ @Unstable
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+
+ @Public
+ @Unstable
+ public String getUser() {
+ return user;
+ }
+
+ @Public
+ @Unstable
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ @Public
+ @Unstable
+ public String getQueue() {
+ return queue;
+ }
+
+ @Public
+ @Unstable
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ @Public
+ @Unstable
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ @Public
+ @Unstable
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ }
+
+ @Public
+ @Unstable
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Public
+ @Unstable
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @Public
+ @Unstable
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ @Public
+ @Unstable
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ @Public
+ @Unstable
+ public String getDiagnosticsInfo() {
+ return diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public void setDiagnosticsInfo(String diagnosticsInfo) {
+ this.diagnosticsInfo = diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus) {
+ this.finalApplicationStatus = finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public YarnApplicationState getYarnApplicationState() {
+ return this.yarnApplicationState;
+ }
+
+ @Public
+ @Unstable
+ public void
+ setYarnApplicationState(YarnApplicationState yarnApplicationState) {
+ this.yarnApplicationState = yarnApplicationState;
+ }
+
+}