You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/04/21 18:52:14 UTC
[2/2] ambari git commit: AMBARI-10622. Add daily aggregation to AMS
(useful for reporting over months of data). (swagle)
AMBARI-10622. Add daily aggregation to AMS (useful for reporting over months of data). (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/67c425ac
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/67c425ac
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/67c425ac
Branch: refs/heads/trunk
Commit: 67c425acfd22dcd701d05eab16f22d423db045dd
Parents: 2557d9a
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Apr 21 09:51:58 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Apr 21 09:51:58 2015 -0700
----------------------------------------------------------------------
.../ambari-metrics-timelineservice/pom.xml | 1 +
.../timeline/HBaseTimelineMetricStore.java | 40 ++--
.../metrics/timeline/PhoenixHBaseAccessor.java | 144 ++++++------
.../metrics/timeline/Precision.java | 6 +-
.../timeline/TimelineMetricConfiguration.java | 29 ++-
.../aggregators/AbstractTimelineAggregator.java | 58 ++++-
.../TimelineClusterMetricReader.java | 42 ----
.../aggregators/TimelineMetricAggregator.java | 151 ++----------
.../TimelineMetricAggregatorFactory.java | 188 ++++++++++++++-
.../TimelineMetricClusterAggregator.java | 235 ++++++-------------
.../TimelineMetricClusterAggregatorHourly.java | 175 --------------
.../TimelineMetricClusterAggregatorMinute.java | 201 ++++++++++++++++
.../TimelineMetricHostAggregator.java | 113 +++++++++
.../aggregators/TimelineMetricReadHelper.java | 36 +++
.../timeline/query/PhoenixTransactSQL.java | 82 ++++---
.../timeline/AbstractMiniHBaseClusterTest.java | 5 +-
.../metrics/timeline/ITClusterAggregator.java | 99 ++++++--
.../metrics/timeline/ITMetricAggregator.java | 82 ++++++-
.../timeline/ITPhoenixHBaseAccessor.java | 34 ++-
.../metrics/timeline/MetricTestHelper.java | 7 +-
.../timeline/TestMetricHostAggregate.java | 4 +-
.../0.1.0/configuration/ams-site.xml | 70 +++++-
22 files changed, 1065 insertions(+), 737 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 4ec730e..2485661 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -249,6 +249,7 @@
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkMode>always</forkMode>
+ <argLine>-XX:-UseSplitVerifier</argLine>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 1fac404..447f6f9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -69,39 +67,53 @@ public class HBaseTimelineMetricStore extends AbstractService
hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
hBaseAccessor.initMetricSchema();
- // Start the cluster aggregator
- TimelineMetricClusterAggregator minuteClusterAggregator =
- new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
+ // Start the cluster aggregator minute
+ TimelineMetricAggregator minuteClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
if (!minuteClusterAggregator.isDisabled()) {
Thread aggregatorThread = new Thread(minuteClusterAggregator);
aggregatorThread.start();
}
- // Start the cluster aggregator hourly
- TimelineMetricClusterAggregatorHourly hourlyClusterAggregator =
- new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf);
+ // Start the hourly cluster aggregator
+ TimelineMetricAggregator hourlyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
if (!hourlyClusterAggregator.isDisabled()) {
Thread aggregatorThread = new Thread(hourlyClusterAggregator);
aggregatorThread.start();
}
- // Start the 5 minute aggregator
+ // Start the daily cluster aggregator
+ TimelineMetricAggregator dailyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+ if (!dailyClusterAggregator.isDisabled()) {
+ Thread aggregatorThread = new Thread(dailyClusterAggregator);
+ aggregatorThread.start();
+ }
+
+ // Start the minute host aggregator
TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
- (hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
if (!minuteHostAggregator.isDisabled()) {
Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
minuteAggregatorThread.start();
}
- // Start hourly host aggregator
+ // Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
- (hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
if (!hourlyHostAggregator.isDisabled()) {
Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
aggregatorHourlyThread.start();
}
+
+ // Start the daily host aggregator
+ TimelineMetricAggregator dailyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+ if (!dailyHostAggregator.isDisabled()) {
+ Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
+ aggregatorDailyThread.start();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 2e78912..e27d9a9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -51,7 +52,9 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
@@ -59,19 +62,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
@@ -85,13 +90,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
*/
public class PhoenixHBaseAccessor {
+ static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
- private final Configuration hbaseConf;
- private final Configuration metricsConf;
- private final RetryCounterFactory retryCounterFactory;
-
- static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
/**
* 4 metrics/min * 60 * 24: Retrieve data for 1 day.
*/
@@ -99,9 +100,11 @@ public class PhoenixHBaseAccessor {
public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
METRICS_PER_MINUTE;
private static ObjectMapper mapper = new ObjectMapper();
-
private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
new TypeReference<Map<Long, Double>>() {};
+ private final Configuration hbaseConf;
+ private final Configuration metricsConf;
+ private final RetryCounterFactory retryCounterFactory;
private final ConnectionProvider dataSource;
public PhoenixHBaseAccessor(Configuration hbaseConf,
@@ -127,37 +130,6 @@ public class PhoenixHBaseAccessor {
(int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
}
-
- private Connection getConnectionRetryingOnException()
- throws SQLException, InterruptedException {
- RetryCounter retryCounter = retryCounterFactory.create();
- while (true) {
- try{
- return getConnection();
- } catch (SQLException e) {
- if(!retryCounter.shouldRetry()){
- LOG.error("HBaseAccessor getConnection failed after "
- + retryCounter.getMaxAttempts() + " attempts");
- throw e;
- }
- }
- retryCounter.sleepUntilNextRetry();
- }
- }
-
- /**
- * Get JDBC connection to HBase store. Assumption is that the hbase
- * configuration is present on the classpath and loaded by the caller into
- * the Configuration object.
- * Phoenix already caches the HConnection between the client and HBase
- * cluster.
- *
- * @return @java.sql.Connection
- */
- public Connection getConnection() throws SQLException {
- return dataSource.getConnection();
- }
-
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
throws SQLException, IOException {
TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
@@ -218,8 +190,7 @@ public class PhoenixHBaseAccessor {
}
@SuppressWarnings("unchecked")
- public static Map<Long, Double> readMetricFromJSON(String json)
- throws IOException {
+ public static Map<Long, Double> readMetricFromJSON(String json) throws IOException {
return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
}
@@ -247,17 +218,34 @@ public class PhoenixHBaseAccessor {
return metricHostAggregate;
}
- public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
- throws SQLException {
- MetricClusterAggregate agg = new MetricClusterAggregate();
- agg.setSum(rs.getDouble("METRIC_SUM"));
- agg.setMax(rs.getDouble("METRIC_MAX"));
- agg.setMin(rs.getDouble("METRIC_MIN"));
- agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
-
- agg.setDeviation(0.0);
+ private Connection getConnectionRetryingOnException()
+ throws SQLException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try{
+ return getConnection();
+ } catch (SQLException e) {
+ if(!retryCounter.shouldRetry()){
+ LOG.error("HBaseAccessor getConnection failed after "
+ + retryCounter.getMaxAttempts() + " attempts");
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
- return agg;
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
}
protected void initMetricSchema() {
@@ -269,24 +257,33 @@ public class PhoenixHBaseAccessor {
String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
+ String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
+ String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
try {
LOG.info("Initializing metrics schema...");
conn = getConnectionRetryingOnException();
stmt = conn.createStatement();
+ // Host level
stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
encoding, precisionTtl, compression));
- stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
- encoding, hostHourTtl, compression));
- stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
- encoding, hostMinTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, hostMinTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, hostHourTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+ METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, hostDailyTtl, compression));
+
+ // Cluster level
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
- encoding, clusterMinTtl, compression));
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, clusterMinTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
- encoding, clusterHourTtl, compression));
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
//alter TTL options to update tables
stmt.executeUpdate(String.format(ALTER_SQL,
@@ -299,11 +296,17 @@ public class PhoenixHBaseAccessor {
METRICS_AGGREGATE_HOURLY_TABLE_NAME,
hostHourTtl));
stmt.executeUpdate(String.format(ALTER_SQL,
+ METRICS_AGGREGATE_DAILY_TABLE_NAME,
+ hostDailyTtl));
+ stmt.executeUpdate(String.format(ALTER_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
clusterMinTtl));
stmt.executeUpdate(String.format(ALTER_SQL,
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
clusterHourTtl));
+ stmt.executeUpdate(String.format(ALTER_SQL,
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME,
+ clusterDailyTtl));
conn.commit();
} catch (SQLException sql) {
@@ -726,9 +729,8 @@ public class PhoenixHBaseAccessor {
}
}
- public void saveHostAggregateRecords(Map<TimelineMetric,
- MetricHostAggregate> hostAggregateMap, String phoenixTableName)
- throws SQLException {
+ public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+ String phoenixTableName) throws SQLException {
if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
LOG.debug("Empty aggregate records.");
@@ -809,9 +811,8 @@ public class PhoenixHBaseAccessor {
*
* @throws SQLException
*/
- public void saveClusterAggregateRecords(
- Map<TimelineClusterMetric, MetricClusterAggregate> records)
- throws SQLException {
+ public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records)
+ throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
@@ -819,11 +820,11 @@ public class PhoenixHBaseAccessor {
}
long start = System.currentTimeMillis();
-
+ String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ stmt = conn.prepareStatement(sqlStr);
int rowCount = 0;
for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
@@ -892,10 +893,8 @@ public class PhoenixHBaseAccessor {
*
* @throws SQLException
*/
- public void saveClusterAggregateHourlyRecords(
- Map<TimelineClusterMetric, MetricHostAggregate> records,
- String tableName)
- throws SQLException {
+ public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records,
+ String tableName) throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
@@ -906,12 +905,10 @@ public class PhoenixHBaseAccessor {
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(String.format
- (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+ stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
int rowCount = 0;
- for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
- aggregateEntry : records.entrySet()) {
+ for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) {
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
MetricHostAggregate aggregate = aggregateEntry.getValue();
@@ -928,7 +925,6 @@ public class PhoenixHBaseAccessor {
stmt.setLong(4, clusterMetric.getTimestamp());
stmt.setString(5, clusterMetric.getType());
stmt.setDouble(6, aggregate.getSum());
-// stmt.setInt(7, aggregate.getNumberOfHosts());
stmt.setLong(7, aggregate.getNumberOfSamples());
stmt.setDouble(8, aggregate.getMax());
stmt.setDouble(9, aggregate.getMin());
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
index c0e1ebc..ee0e87c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
@@ -26,7 +26,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
public enum Precision {
SECONDS,
MINUTES,
- HOURS;
+ HOURS,
+ DAYS;
public static class PrecisionFormatException extends IllegalArgumentException {
public PrecisionFormatException(String message, Throwable cause) {
@@ -41,7 +42,8 @@ public enum Precision {
try {
return Precision.valueOf(precision.toUpperCase());
} catch (IllegalArgumentException e) {
- throw new PrecisionFormatException("precision should be seconds, minutes or hours", e);
+ throw new PrecisionFormatException("precision should be seconds, " +
+ "minutes, hours or days", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index b72aa64..0595c20 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -54,15 +54,25 @@ public class TimelineMetricConfiguration {
public static final String PRECISION_TABLE_TTL =
"timeline.metrics.host.aggregator.ttl";
+
public static final String HOST_MINUTE_TABLE_TTL =
"timeline.metrics.host.aggregator.minute.ttl";
+
+ public static final String HOST_DAILY_TABLE_TTL =
+ "timeline.metrics.host.aggregator.daily.ttl";
+
public static final String HOST_HOUR_TABLE_TTL =
"timeline.metrics.host.aggregator.hourly.ttl";
+
public static final String CLUSTER_MINUTE_TABLE_TTL =
"timeline.metrics.cluster.aggregator.minute.ttl";
+
public static final String CLUSTER_HOUR_TABLE_TTL =
"timeline.metrics.cluster.aggregator.hourly.ttl";
+ public static final String CLUSTER_DAILY_TABLE_TTL =
+ "timeline.metrics.cluster.aggregator.daily.ttl";
+
public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
"timeline.metrics.cluster.aggregator.minute.timeslice.interval";
@@ -78,26 +88,35 @@ public class TimelineMetricConfiguration {
public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL =
"timeline.metrics.host.aggregator.hourly.interval";
+ public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+ "timeline.metrics.host.aggregator.daily.interval";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
"timeline.metrics.cluster.aggregator.minute.interval";
public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL =
"timeline.metrics.cluster.aggregator.hourly.interval";
+ public static final String CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+ "timeline.metrics.cluster.aggregator.daily.interval";
+
public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier";
public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier";
+ public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
- public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
- "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+ public static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier";
public static final String GLOBAL_RESULT_LIMIT =
"timeline.metrics.service.default.result.limit";
@@ -114,12 +133,18 @@ public class TimelineMetricConfiguration {
public static final String HOST_AGGREGATOR_HOUR_DISABLED =
"timeline.metrics.host.aggregator.hourly.disabled";
+ public static final String HOST_AGGREGATOR_DAILY_DISABLED =
+ "timeline.metrics.host.aggregator.hourly.disabled";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
"timeline.metrics.cluster.aggregator.minute.disabled";
public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED =
"timeline.metrics.cluster.aggregator.hourly.disabled";
+ public static final String CLUSTER_AGGREGATOR_DAILY_DISABLED =
+ "timeline.metrics.cluster.aggregator.daily.disabled";
+
public static final String DISABLE_APPLICATION_TIMELINE_STORE =
"timeline.service.disable.application.timeline.store";
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 07717a8..415471d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -41,19 +41,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
* Base class for all runnable aggregators. Provides common functions like
* check pointing and scheduling.
*/
-public abstract class AbstractTimelineAggregator implements Runnable {
+public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator {
protected final PhoenixHBaseAccessor hBaseAccessor;
private final Log LOG;
-
private Clock clock;
protected final long checkpointDelayMillis;
protected final Integer resultsetFetchSize;
protected Configuration metricsConf;
- public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- this(hBaseAccessor, metricsConf, new SystemClock());
- }
+ private String checkpointLocation;
+ private Long sleepIntervalMillis;
+ private Integer checkpointCutOffMultiplier;
+ private String aggregatorDisableParam;
+ protected String tableName;
+ protected String outputTableName;
+ protected Long nativeTimeRangeDelay;
public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf, Clock clk) {
@@ -66,6 +68,30 @@ public abstract class AbstractTimelineAggregator implements Runnable {
this.clock = clk;
}
+ public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf) {
+ this(hBaseAccessor, metricsConf, new SystemClock());
+ }
+
+ public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String aggregatorDisableParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay) {
+ this(hBaseAccessor, metricsConf);
+ this.checkpointLocation = checkpointLocation;
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+ this.aggregatorDisableParam = aggregatorDisableParam;
+ this.tableName = tableName;
+ this.outputTableName = outputTableName;
+ this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ }
+
@Override
public void run() {
LOG.info("Started Timeline aggregator thread @ " + new Date());
@@ -198,6 +224,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
* @param startTime Sample start time
* @param endTime Sample end time
*/
+ @Override
public boolean doWork(long startTime, long endTime) {
LOG.info("Start aggregation cycle @ " + new Date() + ", " +
"startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
@@ -257,18 +284,25 @@ public abstract class AbstractTimelineAggregator implements Runnable {
protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime);
- protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
- throws IOException, SQLException;
+ protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException;
- protected abstract Long getSleepIntervalMillis();
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
- protected abstract Integer getCheckpointCutOffMultiplier();
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
protected Long getCheckpointCutOffIntervalMillis() {
return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
}
- public abstract boolean isDisabled();
+ public boolean isDisabled() {
+ return metricsConf.getBoolean(aggregatorDisableParam, false);
+ }
- protected abstract String getCheckpointLocation();
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
deleted file mode 100644
index 3df88d2..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-public class TimelineClusterMetricReader {
-
- private boolean ignoreInstance;
-
- public TimelineClusterMetricReader(boolean ignoreInstance) {
- this.ignoreInstance = ignoreInstance;
- }
-
- public TimelineClusterMetric fromResultSet(ResultSet rs)
- throws SQLException {
-
- return new TimelineClusterMetric(
- rs.getString("METRIC_NAME"),
- rs.getString("APP_ID"),
- ignoreInstance ? null : rs.getString("INSTANCE_ID"),
- rs.getLong("SERVER_TIME"),
- rs.getString("UNITS"));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index a2887ea..96be48d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -1,3 +1,5 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -6,142 +8,27 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-public class TimelineMetricAggregator extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog
- (TimelineMetricAggregator.class);
-
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
- private final String hostAggregatorDisabledParam;
- private final String tableName;
- private final String outputTableName;
- private final Long nativeTimeRangeDelay;
-
- public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf,
- String checkpointLocation,
- Long sleepIntervalMillis,
- Integer checkpointCutOffMultiplier,
- String hostAggregatorDisabledParam,
- String tableName,
- String outputTableName,
- Long nativeTimeRangeDelay) {
- super(hBaseAccessor, metricsConf);
- this.checkpointLocation = checkpointLocation;
- this.sleepIntervalMillis = sleepIntervalMillis;
- this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
- this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
- this.tableName = tableName;
- this.outputTableName = outputTableName;
- this.nativeTimeRangeDelay = nativeTimeRangeDelay;
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected void aggregate(ResultSet rs, long startTime, long endTime)
- throws IOException, SQLException {
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- aggregateMetricsFromResultSet(rs);
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
- hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
- outputTableName);
- }
-
- @Override
- protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
- endTime, null, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
- tableName));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
- private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
- (ResultSet rs) throws IOException, SQLException {
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
- } else {
- // Switched over to a new metric - save existing - create new aggregate
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
- return hostAggregateMap;
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- public boolean isDisabled() {
- return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
- }
+public interface TimelineMetricAggregator extends Runnable {
+ /**
+ * Aggregate metric data within the time bounds.
+ * @param startTime start time millis
+ * @param endTime end time millis
+ * @return success
+ */
+ public boolean doWork(long startTime, long endTime);
+
+ /**
+ * Is aggregator is disabled by configuration.
+ * @return true/false
+ */
+ public boolean isDisabled();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index a0e4e32..642fcfe 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -22,10 +22,20 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
@@ -33,20 +43,42 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+/**
+ * Factory class that knows how to create a aggregator instance using
+ * @TimelineMetricConfiguration
+ */
public class TimelineMetricAggregatorFactory {
- private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
"timeline-metrics-host-aggregator-checkpoint";
- private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE =
"timeline-metrics-host-aggregator-hourly-checkpoint";
+ private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-daily-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-daily-checkpoint";
+ /**
+ * Minute based aggregation for hosts.
+ */
public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
String checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
(HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
@@ -57,7 +89,7 @@ public class TimelineMetricAggregatorFactory {
String inputTableName = METRICS_RECORD_TABLE_NAME;
String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
- return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
checkpointCutOffMultiplier,
@@ -67,13 +99,16 @@ public class TimelineMetricAggregatorFactory {
120000l);
}
+ /**
+ * Hourly aggregation for hosts.
+ */
public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
String checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE);
long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
(HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
@@ -84,7 +119,37 @@ public class TimelineMetricAggregatorFactory {
String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
- return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l);
+ }
+
+ /**
+ * Daily aggregation for hosts.
+ */
+ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ HOST_AGGREGATE_DAILY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+
+ return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
checkpointCutOffMultiplier,
@@ -94,5 +159,110 @@ public class TimelineMetricAggregatorFactory {
3600000l);
}
+ /**
+ * Minute based aggregation for cluster.
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+
+ long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
+ (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+
+ int checkpointCutOffMultiplier =
+ metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+ // Minute based aggregation have added responsibility of time slicing
+ return new TimelineMetricClusterAggregatorMinute(
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l,
+ timeSliceIntervalMillis
+ );
+ }
+
+ /**
+ * Hourly aggregation for cluster.
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
+
+ return new TimelineMetricClusterAggregator(
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l
+ );
+ }
+
+ /**
+ * Daily aggregation for cluster.
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
+
+ return new TimelineMetricClusterAggregator(
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 68b2ba9..9ed11e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -17,12 +17,9 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -30,74 +27,32 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
- private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
- "timeline-metrics-cluster-aggregator-checkpoint";
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- public final int timeSliceIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
- private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
- // Aggregator to perform app-level aggregates for host metrics
- private final TimelineMetricAppAggregator appAggregator;
+ private final boolean isClusterPrecisionInputTable;
public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
- timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
- (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
- checkpointCutOffMultiplier =
- metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-
- appAggregator = new TimelineMetricAppAggregator(metricsConf);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected void aggregate(ResultSet rs, long startTime, long endTime)
- throws SQLException, IOException {
- List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
- // Initialize app aggregates for host metrics
- appAggregator.init();
- Map<TimelineClusterMetric, MetricClusterAggregate>
- aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
-
- LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
- hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
- appAggregator.cleanup();
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String inputTableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay) {
+ super(hBaseAccessor, metricsConf, checkpointLocation,
+ sleepIntervalMillis, checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam, inputTableName, outputTableName,
+ nativeTimeRangeDelay);
+ isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
}
@Override
@@ -106,9 +61,17 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
endTime, null, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_SQL,
+ String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
- METRICS_RECORD_TABLE_NAME));
+ tableName);
+ // HOST_COUNT vs METRIC_COUNT
+ if (isClusterPrecisionInputTable) {
+ sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ tableName);
+ }
+
+ condition.setStatement(sqlStr);
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("APP_ID");
condition.addOrderByColumn("INSTANCE_ID");
@@ -116,120 +79,58 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return condition;
}
- private List<Long[]> getTimeSlices(long startTime, long endTime) {
- List<Long[]> timeSlices = new ArrayList<Long[]>();
- long sliceStartTime = startTime;
- while (sliceStartTime < endTime) {
- timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
- sliceStartTime += timeSliceIntervalMillis;
- }
- return timeSlices;
- }
-
- private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
- throws SQLException, IOException {
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
- // Create time slices
-
- while (rs.next()) {
- TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
- Map<TimelineClusterMetric, Double> clusterMetrics =
- sliceFromTimelineMetric(metric, timeSlices);
-
- if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
-
- TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
- Double avgValue = clusterMetricEntry.getValue();
-
- MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
- if (aggregate == null) {
- aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
- aggregateClusterMetrics.put(clusterMetric, aggregate);
- } else {
- aggregate.updateSum(avgValue);
- aggregate.updateNumberOfHosts(1);
- aggregate.updateMax(avgValue);
- aggregate.updateMin(avgValue);
- }
- // Update app level aggregates
- appAggregator.processTimelineClusterMetric(clusterMetric,
- metric.getHostName(), avgValue);
- }
- }
- }
- // Add app level aggregates to save
- aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
- return aggregateClusterMetrics;
- }
-
@Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName);
}
- @Override
- public boolean isDisabled() {
- return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
- }
+ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+ throws IOException, SQLException {
- private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
- TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+ TimelineClusterMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineClusterMetric, MetricHostAggregate>();
- if (timelineMetric.getMetricValues().isEmpty()) {
- return null;
- }
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+
+ MetricClusterAggregate currentHostAggregate =
+ isClusterPrecisionInputTable ?
+ readHelper.getMetricClusterAggregateFromResultSet(rs) :
+ readHelper.getMetricClusterTimeAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
- Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
- new HashMap<TimelineClusterMetric, Double>();
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
- for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
- // TODO: investigate null values - pre filter
- if (metric.getValue() == null) {
- continue;
- }
- Long timestamp = getSliceTimeForMetric(timeSlices,
- Long.parseLong(metric.getKey().toString()));
- if (timestamp != -1) {
- // Metric is within desired time range
- TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
- timelineMetric.getMetricName(),
- timelineMetric.getAppId(),
- timelineMetric.getInstanceId(),
- timestamp,
- timelineMetric.getType());
- if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
- timelineClusterMetricMap.put(clusterMetric, metric.getValue());
- } else {
- Double oldValue = timelineClusterMetricMap.get(clusterMetric);
- Double newValue = (oldValue + metric.getValue()) / 2;
- timelineClusterMetricMap.put(clusterMetric, newValue);
- }
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
}
+
}
- return timelineClusterMetricMap;
+ return hostAggregateMap;
}
- /**
- * Return beginning of the time slice into which the metric fits.
- */
- private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
- for (Long[] timeSlice : timeSlices) {
- if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
- return timeSlice[0];
- }
- }
- return -1l;
+ private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+ agg.updateMax(currentClusterAggregate.getMax());
+ agg.updateMin(currentClusterAggregate.getMin());
+ agg.updateSum(currentClusterAggregate.getSum());
+ agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
deleted file mode 100644
index 264e4e6..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog
- (TimelineMetricClusterAggregatorHourly.class);
- private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
- "timeline-metrics-cluster-aggregator-hourly-checkpoint";
- private final String checkpointLocation;
- private final long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
- private long checkpointCutOffIntervalMillis;
- private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
- private final TimelineClusterMetricReader timelineClusterMetricReader
- = new TimelineClusterMetricReader(true);
-
- public TimelineMetricClusterAggregatorHourly(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
- checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
- checkpointCutOffMultiplier = metricsConf.getInt
- (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected void aggregate(ResultSet rs, long startTime, long endTime)
- throws SQLException, IOException {
- Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
- aggregateMetricsFromResultSet(rs);
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
- hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
- METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
- }
-
- @Override
- protected Condition prepareMetricQueryCondition(long startTime,
- long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
- endTime, null, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
- PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
- METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
- private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
- throws IOException, SQLException {
-
- TimelineClusterMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
- Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineClusterMetric currentMetric =
- timelineClusterMetricReader.fromResultSet(rs);
- MetricClusterAggregate currentHostAggregate =
- getMetricClusterAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
- } else {
- // Switched over to a new metric - save existing
- hostAggregate = new MetricHostAggregate();
- updateAggregatesFromHost(hostAggregate, currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
-
- }
-
- return hostAggregateMap;
- }
-
- private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
- agg.updateMax(currentClusterAggregate.getMax());
- agg.updateMin(currentClusterAggregate.getMin());
- agg.updateSum(currentClusterAggregate.getSum());
- agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- protected Long getCheckpointCutOffIntervalMillis() {
- return checkpointCutOffIntervalMillis;
- }
-
- @Override
- public boolean isDisabled() {
- return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
- }
-
-
-}