You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/09/22 20:02:30 UTC
[20/22] AMBARI-5707. Metrics system prototype implementation. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
new file mode 100644
index 0000000..3f174ef
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfig;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+
+public class HBaseTimelineMetricStore extends AbstractService
+ implements TimelineMetricStore {
+
+ static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+ static final String HBASE_CONF = "hbase-site.xml";
+ static final String DEFAULT_CHECKPOINT_LOCATION = "/tmp";
+ static final String AGGREGATOR_CHECKPOINT_FILE =
+ "timeline-metrics-aggregator-checkpoint";
+ static final String MINUTE_AGGREGATE_ROLLUP_CHECKPOINT_FILE =
+ "timeline-metrics-minute-aggregator-checkpoint";
+ static final String HOURLY_AGGREGATE_ROLLUP_CHECKPOINT_FILE =
+ "timeline-metrics-hourly-aggregator-checkpoint";
+ static final String HOURLY_ROLLUP_CHECKPOINT_FILE =
+ "timeline-metrics-hourly-checkpoint";
+ private PhoenixHBaseAccessor hBaseAccessor;
+
+ /**
+ * Construct the service.
+ *
+ */
+ public HBaseTimelineMetricStore() {
+ super(HBaseTimelineMetricStore.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ URL resUrl = getClass().getClassLoader().getResource(HBASE_CONF);
+ LOG.info("Found hbase site configuration: " + resUrl);
+ Configuration hbaseConf;
+ if (resUrl != null) {
+ hbaseConf = new Configuration(true);
+ hbaseConf.addResource(resUrl.toURI().toURL());
+ hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf);
+ hBaseAccessor.initMetricSchema();
+
+ String checkpointLocation = FilenameUtils.concat(conf.get(
+ YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+ DEFAULT_CHECKPOINT_LOCATION), AGGREGATOR_CHECKPOINT_FILE);
+
+ // Start the cluster aggregator
+ TimelineMetricClusterAggregator clusterAggregator =
+ new TimelineMetricClusterAggregator(hBaseAccessor, checkpointLocation);
+ Thread aggregatorThread = new Thread(clusterAggregator);
+ aggregatorThread.start();
+
+ // Start the hourly cluster aggregator
+ String clusterAggregatorHourlyCheckpoint = FilenameUtils.concat(conf.get(
+ YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+ DEFAULT_CHECKPOINT_LOCATION), HOURLY_AGGREGATE_ROLLUP_CHECKPOINT_FILE);
+
+ TimelineMetricClusterAggregatorHourly clusterAggregatorHourly = new
+ TimelineMetricClusterAggregatorHourly(hBaseAccessor,
+ clusterAggregatorHourlyCheckpoint);
+ Thread rollupAggregatorThread = new Thread(clusterAggregatorHourly);
+ rollupAggregatorThread.start();
+
+ // Start the 5 minute aggregator
+ String minuteCheckpoint = FilenameUtils.concat(conf.get(
+ YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+ DEFAULT_CHECKPOINT_LOCATION), MINUTE_AGGREGATE_ROLLUP_CHECKPOINT_FILE);
+ TimelineMetricAggregatorMinute minuteAggregator = new
+ TimelineMetricAggregatorMinute(hBaseAccessor, minuteCheckpoint);
+
+ Thread minuteAggregatorThread = new Thread(minuteAggregator);
+ minuteAggregatorThread.start();
+
+ // Start hourly host aggregator
+ String hostAggregatorHourlyCheckpoint = FilenameUtils.concat(conf.get(
+ YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+ DEFAULT_CHECKPOINT_LOCATION), HOURLY_ROLLUP_CHECKPOINT_FILE);
+
+ TimelineMetricAggregatorHourly aggregatorHourly = new
+ TimelineMetricAggregatorHourly(hBaseAccessor, hostAggregatorHourlyCheckpoint);
+ Thread aggregatorHourlyThread = new Thread(aggregatorHourly);
+ aggregatorHourlyThread.start();
+
+ } else {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No hbase-site present in the classpath.");
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+ String hostname, String applicationId, String instanceId,
+ Long startTime, Long endTime, Integer limit,
+ boolean groupedByHosts) throws SQLException, IOException {
+
+ Condition condition = new Condition(metricNames, hostname, applicationId,
+ instanceId, startTime, endTime, limit, groupedByHosts);
+
+ if (hostname == null) {
+ return hBaseAccessor.getAggregateMetricRecords(condition);
+ }
+
+ return hBaseAccessor.getMetricRecords(condition);
+ }
+
+ @Override
+ public TimelineMetric getTimelineMetric(String metricName, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit)
+ throws SQLException, IOException {
+
+ TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
+ new Condition(Collections.singletonList(metricName), hostname,
+ applicationId, instanceId, startTime, endTime, limit, true)
+ );
+
+ TimelineMetric metric = new TimelineMetric();
+ List<TimelineMetric> metricList = metrics.getMetrics();
+
+ if (metricList != null && !metricList.isEmpty()) {
+ metric.setMetricName(metricList.get(0).getMetricName());
+ metric.setAppId(metricList.get(0).getAppId());
+ metric.setInstanceId(metricList.get(0).getInstanceId());
+ metric.setHostName(metricList.get(0).getHostName());
+ // Assumption that metrics are ordered by start time
+ metric.setStartTime(metricList.get(0).getStartTime());
+ Map<Long, Double> metricRecords = new HashMap<Long, Double>();
+ for (TimelineMetric timelineMetric : metricList) {
+ metricRecords.putAll(timelineMetric.getMetricValues());
+ }
+ metric.setMetricValues(metricRecords);
+ }
+
+ return metric;
+ }
+
+
+ @Override
+ public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ // Error indicated by the Sql exception
+ TimelinePutResponse response = new TimelinePutResponse();
+
+ hBaseAccessor.insertMetricRecords(metrics);
+
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..d39b1cb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtilsExt;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricClusterAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+
+ private final Configuration conf;
+ static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+
+ private static final String ZOOKEEPER_CLIENT_PORT =
+ "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+ static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+
+ public PhoenixHBaseAccessor(Configuration conf) {
+ this.conf = conf;
+ try {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ } catch (ClassNotFoundException e) {
+ LOG.error("Phoenix client jar not found in the classpath.");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ * @return @java.sql.Connection
+ */
+ protected Connection getConnection() {
+ Connection connection = null;
+ String zookeeperClientPort = conf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
+ String zookeeperQuorum = conf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = conf.getTrimmed(ZNODE_PARENT, "/hbase");
+
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ String url = String.format(connectionUrl, zookeeperQuorum,
+ zookeeperClientPort, znodeParent);
+
+ LOG.debug("Metric store connection url: " + url);
+
+ try {
+ connection = DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+
+ return connection;
+ }
+
+ @SuppressWarnings("unchecked")
+ static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setHostName(rs.getString("HOSTNAME"));
+ metric.setTimestamp(rs.getLong("TIMESTAMP"));
+ metric.setStartTime(rs.getLong("START_TIME"));
+ metric.setType(rs.getString("UNITS"));
+ metric.setMetricValues(
+ (Map<Long, Double>) TimelineUtilsExt.readMetricFromJSON(
+ rs.getString("METRICS")));
+ return metric;
+ }
+
+ static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setHostName(rs.getString("HOSTNAME"));
+ metric.setTimestamp(rs.getLong("TIMESTAMP"));
+ metric.setType(rs.getString("UNITS"));
+ return metric;
+ }
+
+ static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_AVG"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ metricHostAggregate.setDeviation(0.0);
+ return metricHostAggregate;
+ }
+
+
+ protected void initMetricSchema() {
+ Connection conn = getConnection();
+ Statement stmt = null;
+
+ try {
+ LOG.info("Initializing metrics schema...");
+ stmt = conn.createStatement();
+ stmt.executeUpdate(CREATE_METRICS_TABLE_SQL);
+ stmt.executeUpdate(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL);
+ stmt.executeUpdate(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL);
+ stmt.executeUpdate(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL);
+ stmt.executeUpdate(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL);
+ conn.commit();
+ } catch (SQLException sql) {
+ LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ public void insertMetricRecords(TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+ if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+ LOG.debug("Empty metrics insert request.");
+ return;
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ long currentTime = System.currentTimeMillis();
+
+ try {
+ stmt = conn.prepareStatement(UPSERT_METRICS_SQL);
+
+ for (TimelineMetric metric : timelineMetrics) {
+ stmt.clearParameters();
+
+ LOG.trace("host: " + metric.getHostName() + ", " +
+ "values: " + metric.getMetricValues());
+ Double[] aggregates = calculateAggregates(metric.getMetricValues());
+
+ stmt.setString(1, metric.getMetricName());
+ stmt.setString(2, metric.getHostName());
+ stmt.setString(3, metric.getAppId());
+ stmt.setString(4, metric.getInstanceId());
+ stmt.setLong(5, currentTime);
+ stmt.setLong(6, metric.getStartTime());
+ stmt.setString(7, metric.getType());
+ stmt.setDouble(8, aggregates[0]);
+ stmt.setDouble(9, aggregates[1]);
+ stmt.setDouble(10, aggregates[2]);
+ stmt.setString(11,
+ TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()));
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ private Double[] calculateAggregates(Map<Long, Double> metricValues) {
+ Double[] values = new Double[3];
+ Double max = Double.MIN_VALUE;
+ Double min = Double.MAX_VALUE;
+ Double avg = 0.0;
+ if (metricValues != null && !metricValues.isEmpty()) {
+ for (Double value : metricValues.values()) {
+ // TODO: Some nulls in data - need to investigate null values from host
+ if (value != null) {
+ if (value > max) {
+ max = value;
+ }
+ if (value < min) {
+ min = value;
+ }
+ avg += value;
+ }
+ }
+ avg /= metricValues.values().size();
+ }
+ values[0] = max != Double.MIN_VALUE ? max : 0.0;
+ values[1] = min != Double.MAX_VALUE ? min : 0.0;
+ values[2] = avg;
+ return values;
+ }
+
+ @SuppressWarnings("unchecked")
+ public TimelineMetrics getMetricRecords(final Condition condition)
+ throws SQLException, IOException {
+
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+
+ try {
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
+ }
+ }
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ return metrics;
+ }
+
+ public void saveHostAggregateRecords(Map<TimelineMetric,
+ MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+ throws SQLException {
+
+ if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+
+ long start = System.currentTimeMillis();
+ int rowCount = 0;
+
+ try {
+ stmt = conn.prepareStatement(
+ String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+ for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+ hostAggregateMap.entrySet()) {
+
+ TimelineMetric metric = metricAggregate.getKey();
+ MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, metric.getMetricName());
+ stmt.setString(2, metric.getHostName());
+ stmt.setString(3, metric.getAppId());
+ stmt.setString(4, metric.getInstanceId());
+ stmt.setLong(5, metric.getTimestamp());
+ stmt.setString(6, metric.getType());
+ stmt.setDouble(7, hostAggregate.getSum());
+ stmt.setDouble(8, hostAggregate.getMax());
+ stmt.setDouble(9, hostAggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save map: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getClass());
+ }
+ }
+ }
+
+ /**
+ * Save Metric aggregate records.
+ * @throws SQLException
+ */
+ public void saveClusterAggregateRecords(Map<TimelineClusterMetric,
+ MetricClusterAggregate> records) throws SQLException {
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Empty aggregate records.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ int rowCount = 0;
+
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateEntry : records.entrySet()) {
+ TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, clusterMetric.getMetricName());
+ stmt.setString(2, clusterMetric.getAppId());
+ stmt.setString(3, clusterMetric.getInstanceId());
+ stmt.setLong(4, clusterMetric.getTimestamp());
+ stmt.setString(5, clusterMetric.getType());
+ stmt.setDouble(6, aggregate.getSum());
+ stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setDouble(8, aggregate.getMax());
+ stmt.setDouble(9, aggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ long end = System.currentTimeMillis();
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getName());
+ }
+ }
+
+
+ public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+ throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+
+ try {
+ stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setTimestamp(rs.getLong("TIMESTAMP"));
+ metric.setStartTime(rs.getLong("TIMESTAMP"));
+ Map<Long, Double> valueMap = new HashMap<Long, Double>();
+ valueMap.put(rs.getLong("TIMESTAMP"), rs.getDouble("METRIC_SUM") /
+ rs.getInt("HOSTS_COUNT"));
+ metric.setMetricValues(valueMap);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
+ }
+ }
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+ return metrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..60a5673
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+ static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ // TODO: Configurable TTL values
+ /**
+ * Create table to store individual metric records.
+ */
+ public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+ "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+ "START_TIME UNSIGNED_LONG, UNITS CHAR(20), " +
+ "METRIC_AVG DOUBLE, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE, " +
+ "METRICS VARCHAR CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+ "IMMUTABLE_ROWS=true, TTL=86400";
+
+ public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+ "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+ "IMMUTABLE_ROWS=true, TTL=2592000";
+
+ public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+ "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+ "IMMUTABLE_ROWS=true, TTL=604800";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+ "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
+ "TIMESTAMP UNSIGNED_LONG NOT NULL, UNITS CHAR(20), METRIC_SUM DOUBLE, " +
+ "HOSTS_COUNT UNSIGNED_INT, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+ "IMMUTABLE_ROWS=true, TTL=2592000";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+ "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
+ "TIMESTAMP UNSIGNED_LONG NOT NULL, UNITS CHAR(20), METRIC_AVG DOUBLE, " +
+ "METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+ "IMMUTABLE_ROWS=true, TTL=31536000";
+
+ /**
+ * Insert into metric records table.
+ */
+ public static final String UPSERT_METRICS_SQL = "UPSERT INTO METRIC_RECORD " +
+ "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, START_TIME, " +
+ "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN, METRICS) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+ "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP, " +
+ "UNITS, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+ "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "TIMESTAMP, UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ /**
+ * Retrieve a set of rows from metrics records table.
+ */
+ public static final String GET_METRIC_SQL = "SELECT METRIC_NAME, " +
+ "HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, START_TIME, UNITS, METRIC_AVG, " +
+ "METRIC_MAX, METRIC_MIN, METRICS FROM METRIC_RECORD";
+
+ public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT " +
+ "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, " +
+ "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN FROM %s";
+
+ public static final String GET_CLUSTER_AGGREGATE_SQL =
+ "SELECT METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, TIMESTAMP, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, " +
+ "METRIC_MIN FROM METRIC_AGGREGATE";
+
+ /**
+ * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
+ */
+ public static final Integer DEFAULT_RESULT_LIMIT = 5760;
+ public static final String METRICS_RECORD_TABLE_NAME =
+ "METRIC_RECORD";
+ public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+ "METRIC_RECORD_MINUTE";
+ public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_RECORD_HOURLY";
+
+ public static PreparedStatement prepareGetMetricsSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+ String stmtStr = GET_METRIC_SQL;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ sb.append(" ORDER BY METRIC_NAME, TIMESTAMP");
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for ( ; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getHostname() != null) {
+ stmt.setString(pos++, condition.getHostname());
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ stmt.setString(pos++, condition.getAppId().toLowerCase());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ stmt.setLong(pos, condition.getEndTime());
+ }
+ if (condition.getFetchSize() != null) {
+ stmt.setFetchSize(condition.getFetchSize());
+ }
+
+ return stmt;
+ }
+
+ public static PreparedStatement prepareGetAggregateSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+
+ StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ sb.append(" ORDER BY METRIC_NAME, TIMESTAMP");
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for ( ; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ stmt.setString(pos++, condition.getAppId().toLowerCase());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ stmt.setLong(pos, condition.getEndTime());
+ }
+
+ return stmt;
+ }
+
+ static class Condition {
+ List<String> metricNames;
+ String hostname;
+ String appId;
+ String instanceId;
+ Long startTime;
+ Long endTime;
+ Integer limit;
+ boolean grouped;
+ boolean noLimit = false;
+ Integer fetchSize;
+ String statement;
+
+ Condition(List<String> metricNames, String hostname, String appId,
+ String instanceId, Long startTime, Long endTime, Integer limit,
+ boolean grouped) {
+ this.metricNames = metricNames;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.limit = limit;
+ this.grouped = grouped;
+ }
+
+ String getStatement() {
+ return statement;
+ }
+
+ void setStatement(String statement) {
+ this.statement = statement;
+ }
+
+ List<String> getMetricNames() {
+ return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+ }
+
+ String getMetricsClause() {
+ StringBuilder sb = new StringBuilder("(");
+ if (metricNames != null) {
+ for (String name : metricNames) {
+ if (sb.length() != 1) {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(")");
+ return sb.toString();
+ } else {
+ return null;
+ }
+ }
+
+ String getConditionClause() {
+ StringBuilder sb = new StringBuilder();
+ boolean appendConjunction = false;
+
+ if (getMetricNames() != null) {
+ sb.append("METRIC_NAME IN ");
+ sb.append(getMetricsClause());
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getHostname() != null) {
+ sb.append(" HOSTNAME = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getAppId() != null) {
+ sb.append(" APP_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getInstanceId() != null) {
+ sb.append(" INSTANCE_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getStartTime() != null) {
+ sb.append(" TIMESTAMP >= ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ if (getEndTime() != null) {
+ sb.append(" TIMESTAMP < ?");
+ }
+ return sb.toString();
+ }
+
+ String getHostname() {
+ return hostname == null || hostname.isEmpty() ? null : hostname;
+ }
+
+ String getAppId() {
+ return appId == null || appId.isEmpty() ? null : appId;
+ }
+
+ String getInstanceId() {
+ return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+ }
+
+ /**
+ * Convert to millis.
+ */
+ Long getStartTime() {
+ if (startTime < 9999999999l) {
+ return startTime * 1000;
+ } else {
+ return startTime;
+ }
+ }
+
+ Long getEndTime() {
+ if (endTime < 9999999999l) {
+ return endTime * 1000;
+ } else {
+ return endTime;
+ }
+ }
+
+ void setNoLimit() {
+ this.noLimit = true;
+ }
+
+ Integer getLimit() {
+ if (noLimit) {
+ return null;
+ }
+ return limit == null ? DEFAULT_RESULT_LIMIT : limit;
+ }
+
+ boolean isGrouped() {
+ return grouped;
+ }
+
+ boolean isEmpty() {
+ return (metricNames == null || metricNames.isEmpty())
+ && (hostname == null || hostname.isEmpty())
+ && (appId == null || appId.isEmpty())
+ && (instanceId == null || instanceId.isEmpty())
+ && startTime == null
+ && endTime == null;
+ }
+
+ Integer getFetchSize() {
+ return fetchSize;
+ }
+
+ void setFetchSize(Integer fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ @Override
+ public String toString() {
+ return "Condition{" +
+ "metricNames=" + metricNames +
+ ", hostname='" + hostname + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", limit=" + limit +
+ ", grouped=" + grouped +
+ ", noLimit=" + noLimit +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
new file mode 100644
index 0000000..50a4f63
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
+ static final Long SLEEP_INTERVAL = 3600000l;
+ static final Long CHECKPOINT_CUT_OFF_INTERVAL = SLEEP_INTERVAL * 2;
+ static final Integer RESULTSET_FETCH_SIZE = 1000;
+ private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorHourly.class);
+
+ public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
+ String checkpointLocation) {
+ super(hBaseAccessor, checkpointLocation);
+ }
+
+ @Override
+ protected boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date());
+
+ boolean success = true;
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(RESULTSET_FETCH_SIZE);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ LOG.info("End aggregation cycle @ " + new Date());
+ return success;
+ }
+
+ @Override
+ protected Long getSleepInterval() {
+ return SLEEP_INTERVAL;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffInterval() {
+ return CHECKPOINT_CUT_OFF_INTERVAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
new file mode 100644
index 0000000..a3909cf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
+
+public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
+ static final Long SLEEP_INTERVAL = 300000l; // 5 mins
+ static final Long CHECKPOINT_CUT_OFF_INTERVAL = SLEEP_INTERVAL * 4;
+ private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
+
+ public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
+ String checkpointLocation) {
+ super(hBaseAccessor, checkpointLocation);
+ }
+
+ @Override
+ protected boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date());
+
+ boolean success = true;
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(RESULTSET_FETCH_SIZE);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_RECORD_TABLE_NAME));
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = prepareGetMetricsSqlStmt(conn, condition);
+ LOG.info("Query issued @: " + new Date());
+ ResultSet rs = stmt.executeQuery();
+ LOG.info("Query returned @: " + new Date());
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ LOG.info("End aggregation cycle @ " + new Date());
+ return success;
+ }
+
+ @Override
+ protected Long getSleepInterval() {
+ return SLEEP_INTERVAL;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffInterval() {
+ return CHECKPOINT_CUT_OFF_INTERVAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..10b2d70
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
+
+/**
+ * Aggregates a metric across all hosts in the cluster.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ public static final long WAKE_UP_INTERVAL = 120000;
+ public static final int TIME_SLICE_INTERVAL = 15000;
+ private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+
+ public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ String checkpointLocation) {
+ super(hBaseAccessor, checkpointLocation);
+ }
+
+ /**
+ * Read metrics written during the time interval and save the sum and total
+ * in the aggregate table.
+ *
+ * @param startTime Sample start time
+ * @param endTime Sample end time
+ */
+ protected boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date());
+
+ boolean success = true;
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setFetchSize(RESULTSET_FETCH_SIZE);
+ condition.setNoLimit();
+ condition.setStatement(GET_METRIC_SQL);
+
+ Connection conn;
+ PreparedStatement stmt;
+
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = prepareGetMetricsSqlStmt(conn, condition);
+ LOG.info("Query issued @: " + new Date());
+ ResultSet rs = stmt.executeQuery();
+ LOG.info("Query returned @: " + new Date());
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ // Create time slices
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + TIME_SLICE_INTERVAL });
+ sliceStartTime += TIME_SLICE_INTERVAL;
+ }
+
+ while (rs.next()) {
+ TimelineMetric metric =
+ PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+ Map<TimelineClusterMetric, Double> clusterMetrics =
+ sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+ Double avgValue = clusterMetricEntry.getValue();
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null,
+ avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ }
+ }
+ }
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+
+ LOG.info("End aggregation cycle @ " + new Date());
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ }
+
+ return success;
+ }
+
+ @Override
+ protected Long getSleepInterval() {
+ return WAKE_UP_INTERVAL;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffInterval() {
+ return 600000l;
+ }
+
+ private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+ if (timelineMetric.getMetricValues().isEmpty()) {
+ return null;
+ }
+
+ Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+ new HashMap<TimelineClusterMetric, Double>();
+
+ for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+ // TODO: investigate null values - pre filter
+ if (metric.getValue() == null) {
+ continue;
+ }
+ Long timestamp = getSliceTimeForMetric(timeSlices,
+ Long.parseLong(metric.getKey().toString()));
+ if (timestamp != -1) {
+ // Metric is within desired time range
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+ timelineMetric.getMetricName(), timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(), timestamp, timelineMetric.getType());
+
+ if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+ timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+ } else {
+ Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+ Double newValue = (oldValue + metric.getValue()) / 2;
+ timelineClusterMetricMap.put(clusterMetric, newValue);
+ }
+ }
+ }
+
+ return timelineClusterMetricMap;
+ }
+
+ /**
+ * Return beginning of the time slice into which the metric fits.
+ */
+ private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+ for (Long[] timeSlice : timeSlices) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+ return timeSlice[0];
+ }
+ }
+ return -1l;
+ }
+
+ public static class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+ private String type;
+
+ TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ this.type = type;
+ }
+
+ String getMetricName() {
+ return metricName;
+ }
+
+ String getAppId() {
+ return appId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ String getType() { return type; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..1caf809
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+ public static final long SLEEP_INTERVAL = 3600000;
+
+ public TimelineMetricClusterAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
+ String checkpointLocation) {
+ super(hBaseAccessor, checkpointLocation);
+ }
+
+ @Override
+ protected boolean doWork(long startTime, long endTime) {
+ return false;
+ }
+
+ @Override
+ protected Long getSleepInterval() {
+ return SLEEP_INTERVAL;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffInterval() {
+ return 7200000l;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
new file mode 100644
index 0000000..5224450
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface TimelineMetricStore {
+ /**
+ * This method retrieves metrics stored byu the Timeline store.
+ *
+ * @param metricNames Names of the metric, e.g.: cpu_user
+ * @param hostname Name of the host where the metric originated from
+ * @param applicationId Id of the application to which this metric belongs
+ * @param instanceId Application instance id.
+ * @param startTime Start timestamp
+ * @param endTime End timestamp
+ * @param limit Override default result limit
+ * @param groupedByHosts Group {@link TimelineMetric} by metric name, hostname,
+ * app id and instance id
+ *
+ * @return {@link TimelineMetric}
+ * @throws java.sql.SQLException
+ */
+ TimelineMetrics getTimelineMetrics(List<String> metricNames, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit, boolean groupedByHosts)
+ throws SQLException, IOException;
+
+
+ /**
+ * Return all records for a single metric satisfying the filter criteria.
+ * @return {@link TimelineMetric}
+ */
+ TimelineMetric getTimelineMetric(String metricName, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit)
+ throws SQLException, IOException;
+
+
+ /**
+ * Stores metric information to the timeline store. Any errors occurring for
+ * individual put request objects will be reported in the response.
+ *
+ * @param metrics An {@link TimelineMetrics}.
+ * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}.
+ * @throws SQLException, IOException
+ */
+ TimelinePutResponse putMetrics(TimelineMetrics metrics)
+ throws SQLException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
new file mode 100644
index 0000000..7ba51af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptFinishData {
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptFinishData newInstance(
+ ApplicationAttemptId appAttemptId, String diagnosticsInfo,
+ String trackingURL, FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ ApplicationAttemptFinishData appAttemptFD =
+ Records.newRecord(ApplicationAttemptFinishData.class);
+ appAttemptFD.setApplicationAttemptId(appAttemptId);
+ appAttemptFD.setDiagnosticsInfo(diagnosticsInfo);
+ appAttemptFD.setTrackingURL(trackingURL);
+ appAttemptFD.setFinalApplicationStatus(finalApplicationStatus);
+ appAttemptFD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+ return appAttemptFD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationAttemptId(
+ ApplicationAttemptId applicationAttemptId);
+
+ @Public
+ @Unstable
+ public abstract String getTrackingURL();
+
+ @Public
+ @Unstable
+ public abstract void setTrackingURL(String trackingURL);
+
+ @Public
+ @Unstable
+ public abstract String getDiagnosticsInfo();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+ @Public
+ @Unstable
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus);
+
+ @Public
+ @Unstable
+ public abstract YarnApplicationAttemptState getYarnApplicationAttemptState();
+
+ @Public
+ @Unstable
+ public abstract void setYarnApplicationAttemptState(
+ YarnApplicationAttemptState yarnApplicationAttemptState);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
new file mode 100644
index 0000000..b759ab1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMAppAttempt</code>.
+ */
+@Public
+@Unstable
+public class ApplicationAttemptHistoryData {
+
+ private ApplicationAttemptId applicationAttemptId;
+
+ private String host;
+
+ private int rpcPort;
+
+ private String trackingURL;
+
+ private String diagnosticsInfo;
+
+ private FinalApplicationStatus finalApplicationStatus;
+
+ private ContainerId masterContainerId;
+
+ private YarnApplicationAttemptState yarnApplicationAttemptState;
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptHistoryData newInstance(
+ ApplicationAttemptId appAttemptId, String host, int rpcPort,
+ ContainerId masterContainerId, String diagnosticsInfo,
+ String trackingURL, FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ ApplicationAttemptHistoryData appAttemptHD =
+ new ApplicationAttemptHistoryData();
+ appAttemptHD.setApplicationAttemptId(appAttemptId);
+ appAttemptHD.setHost(host);
+ appAttemptHD.setRPCPort(rpcPort);
+ appAttemptHD.setMasterContainerId(masterContainerId);
+ appAttemptHD.setDiagnosticsInfo(diagnosticsInfo);
+ appAttemptHD.setTrackingURL(trackingURL);
+ appAttemptHD.setFinalApplicationStatus(finalApplicationStatus);
+ appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+ return appAttemptHD;
+ }
+
+ @Public
+ @Unstable
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ @Public
+ @Unstable
+ public void
+ setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ @Public
+ @Unstable
+ public String getHost() {
+ return host;
+ }
+
+ @Public
+ @Unstable
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ @Public
+ @Unstable
+ public int getRPCPort() {
+ return rpcPort;
+ }
+
+ @Public
+ @Unstable
+ public void setRPCPort(int rpcPort) {
+ this.rpcPort = rpcPort;
+ }
+
+ @Public
+ @Unstable
+ public String getTrackingURL() {
+ return trackingURL;
+ }
+
+ @Public
+ @Unstable
+ public void setTrackingURL(String trackingURL) {
+ this.trackingURL = trackingURL;
+ }
+
+ @Public
+ @Unstable
+ public String getDiagnosticsInfo() {
+ return diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public void setDiagnosticsInfo(String diagnosticsInfo) {
+ this.diagnosticsInfo = diagnosticsInfo;
+ }
+
+ @Public
+ @Unstable
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus) {
+ this.finalApplicationStatus = finalApplicationStatus;
+ }
+
+ @Public
+ @Unstable
+ public ContainerId getMasterContainerId() {
+ return masterContainerId;
+ }
+
+ @Public
+ @Unstable
+ public void setMasterContainerId(ContainerId masterContainerId) {
+ this.masterContainerId = masterContainerId;
+ }
+
+ @Public
+ @Unstable
+ public YarnApplicationAttemptState getYarnApplicationAttemptState() {
+ return yarnApplicationAttemptState;
+ }
+
+ @Public
+ @Unstable
+ public void setYarnApplicationAttemptState(
+ YarnApplicationAttemptState yarnApplicationAttemptState) {
+ this.yarnApplicationAttemptState = yarnApplicationAttemptState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
new file mode 100644
index 0000000..7ca43fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> starts, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptStartData {
+
+ @Public
+ @Unstable
+ public static ApplicationAttemptStartData newInstance(
+ ApplicationAttemptId appAttemptId, String host, int rpcPort,
+ ContainerId masterContainerId) {
+ ApplicationAttemptStartData appAttemptSD =
+ Records.newRecord(ApplicationAttemptStartData.class);
+ appAttemptSD.setApplicationAttemptId(appAttemptId);
+ appAttemptSD.setHost(host);
+ appAttemptSD.setRPCPort(rpcPort);
+ appAttemptSD.setMasterContainerId(masterContainerId);
+ return appAttemptSD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationAttemptId(
+ ApplicationAttemptId applicationAttemptId);
+
+ @Public
+ @Unstable
+ public abstract String getHost();
+
+ @Public
+ @Unstable
+ public abstract void setHost(String host);
+
+ @Public
+ @Unstable
+ public abstract int getRPCPort();
+
+ @Public
+ @Unstable
+ public abstract void setRPCPort(int rpcPort);
+
+ @Public
+ @Unstable
+ public abstract ContainerId getMasterContainerId();
+
+ @Public
+ @Unstable
+ public abstract void setMasterContainerId(ContainerId masterContainerId);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
new file mode 100644
index 0000000..997fa6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when <code>RMApp</code>
+ * finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationFinishData {
+
+ @Public
+ @Unstable
+ public static ApplicationFinishData newInstance(ApplicationId applicationId,
+ long finishTime, String diagnosticsInfo,
+ FinalApplicationStatus finalApplicationStatus,
+ YarnApplicationState yarnApplicationState) {
+ ApplicationFinishData appFD =
+ Records.newRecord(ApplicationFinishData.class);
+ appFD.setApplicationId(applicationId);
+ appFD.setFinishTime(finishTime);
+ appFD.setDiagnosticsInfo(diagnosticsInfo);
+ appFD.setFinalApplicationStatus(finalApplicationStatus);
+ appFD.setYarnApplicationState(yarnApplicationState);
+ return appFD;
+ }
+
+ @Public
+ @Unstable
+ public abstract ApplicationId getApplicationId();
+
+ @Public
+ @Unstable
+ public abstract void setApplicationId(ApplicationId applicationId);
+
+ @Public
+ @Unstable
+ public abstract long getFinishTime();
+
+ @Public
+ @Unstable
+ public abstract void setFinishTime(long finishTime);
+
+ @Public
+ @Unstable
+ public abstract String getDiagnosticsInfo();
+
+ @Public
+ @Unstable
+ public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+ @Public
+ @Unstable
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+ @Public
+ @Unstable
+ public abstract void setFinalApplicationStatus(
+ FinalApplicationStatus finalApplicationStatus);
+
+ @Public
+ @Unstable
+ public abstract YarnApplicationState getYarnApplicationState();
+
+ @Public
+ @Unstable
+ public abstract void setYarnApplicationState(
+ YarnApplicationState yarnApplicationState);
+
+}