You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/11/04 06:31:20 UTC
ambari git commit: AMBARI-13701. Introduce cluster wide MINUTE
aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-2.1 ee7c3bdc2 -> 2a89def1c
AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2a89def1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2a89def1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2a89def1
Branch: refs/heads/branch-2.1
Commit: 2a89def1c26e85f65ab441223f1a76c5b6d1c849
Parents: ee7c3bd
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 3 21:31:03 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 3 21:31:03 2015 -0800
----------------------------------------------------------------------
.../timeline/HBaseTimelineMetricStore.java | 10 +-
.../metrics/timeline/PhoenixHBaseAccessor.java | 33 ++-
.../timeline/TimelineMetricConfiguration.java | 14 +-
.../TimelineMetricAggregatorFactory.java | 79 +++++-
.../TimelineMetricClusterAggregatorMinute.java | 248 -------------------
.../TimelineMetricClusterAggregatorSecond.java | 248 +++++++++++++++++++
.../timeline/query/PhoenixTransactSQL.java | 9 +-
.../metrics/timeline/ITClusterAggregator.java | 92 ++++++-
.../timeline/ITPhoenixHBaseAccessor.java | 7 +-
.../cache/TimelineMetricCacheEntryFactory.java | 19 +-
.../server/upgrade/UpgradeCatalog213.java | 24 +-
.../0.1.0/configuration/ams-site.xml | 54 +++-
.../0.1.0/package/scripts/split_points.py | 2 +-
.../server/upgrade/UpgradeCatalog213Test.java | 76 ++++++
14 files changed, 602 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 17df629..aed5fed 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -76,7 +76,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}
- // Start the cluster aggregator minute
+ // Start the cluster aggregator second
+ TimelineMetricAggregator secondClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf);
+ if (!secondClusterAggregator.isDisabled()) {
+ Thread aggregatorThread = new Thread(secondClusterAggregator);
+ aggregatorThread.start();
+ }
+
+ // Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
if (!minuteClusterAggregator.isDisabled()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3ce30fd..be06650 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -51,8 +51,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -63,6 +61,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
@@ -76,7 +75,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
@@ -86,6 +85,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
@@ -242,13 +242,14 @@ public class PhoenixHBaseAccessor {
String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
- String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
- String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
- String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
- String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
- String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
- String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
- String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
+ String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); //1 day
+ String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); //7 days
+ String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); //30 days
+ String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000"); //1 year
+ String clusterSecTtl = metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "2592000"); //7 days
+ String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "7776000"); //30 days
+ String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); //1 year
+ String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "63072000"); //2 years
try {
LOG.info("Initializing metrics schema...");
@@ -278,9 +279,11 @@ public class PhoenixHBaseAccessor {
aggregateSql += getSplitPointsStr(splitPoints);
}
stmt.executeUpdate(aggregateSql);
- stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+ METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, clusterHourTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
- stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
//alter TTL options to update tables
@@ -298,6 +301,9 @@ public class PhoenixHBaseAccessor {
hostDailyTtl));
stmt.executeUpdate(String.format(ALTER_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+ clusterSecTtl));
+ stmt.executeUpdate(String.format(ALTER_SQL,
+ METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
clusterMinTtl));
stmt.executeUpdate(String.format(ALTER_SQL,
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
@@ -664,7 +670,8 @@ public class PhoenixHBaseAccessor {
for (Function aggregateFunction : functions) {
SingleValuedTimelineMetric metric;
- if (condition.getPrecision() == Precision.HOURS
+ if (condition.getPrecision() == Precision.MINUTES
+ || condition.getPrecision() == Precision.HOURS
|| condition.getPrecision() == Precision.DAYS) {
metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
} else {
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 30e42f2..fd51f3d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -64,6 +64,9 @@ public class TimelineMetricConfiguration {
public static final String HOST_HOUR_TABLE_TTL =
"timeline.metrics.host.aggregator.hourly.ttl";
+ public static final String CLUSTER_SECOND_TABLE_TTL =
+ "timeline.metrics.cluster.aggregator.second.ttl";
+
public static final String CLUSTER_MINUTE_TABLE_TTL =
"timeline.metrics.cluster.aggregator.minute.ttl";
@@ -74,7 +77,7 @@ public class TimelineMetricConfiguration {
"timeline.metrics.cluster.aggregator.daily.ttl";
public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
- "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
+ "timeline.metrics.cluster.aggregator.second.timeslice.interval";
public static final String AGGREGATOR_CHECKPOINT_DELAY =
"timeline.metrics.service.checkpointDelay";
@@ -91,6 +94,9 @@ public class TimelineMetricConfiguration {
public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
"timeline.metrics.host.aggregator.daily.interval";
+ public static final String CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL =
+ "timeline.metrics.cluster.aggregator.second.interval";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
"timeline.metrics.cluster.aggregator.minute.interval";
@@ -109,6 +115,9 @@ public class TimelineMetricConfiguration {
public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
+ public static final String CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER =
+ "timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
@@ -136,6 +145,9 @@ public class TimelineMetricConfiguration {
public static final String HOST_AGGREGATOR_DAILY_DISABLED =
"timeline.metrics.host.aggregator.hourly.disabled";
+ public static final String CLUSTER_AGGREGATOR_SECOND_DISABLED =
+ "timeline.metrics.cluster.aggregator.second.disabled";
+
public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
"timeline.metrics.cluster.aggregator.minute.disabled";
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index f07918c..ba019fa 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -28,8 +28,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
@@ -49,6 +52,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
@@ -65,6 +69,8 @@ public class TimelineMetricAggregatorFactory {
"timeline-metrics-host-aggregator-daily-checkpoint";
private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
"timeline-metrics-cluster-aggregator-checkpoint";
+ private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-minute-checkpoint";
private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
"timeline-metrics-cluster-aggregator-hourly-checkpoint";
private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
@@ -76,6 +82,7 @@ public class TimelineMetricAggregatorFactory {
/**
* Minute based aggregation for hosts.
+ * Interval : 5 mins
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -119,6 +126,7 @@ public class TimelineMetricAggregatorFactory {
/**
* Hourly aggregation for hosts.
+ * Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -162,6 +170,7 @@ public class TimelineMetricAggregatorFactory {
/**
* Daily aggregation for hosts.
+ * Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -204,10 +213,12 @@ public class TimelineMetricAggregatorFactory {
}
/**
- * Minute based aggregation for cluster.
+ * Second aggregation for cluster.
+ * Interval : 2 mins
+ * Timeslice : 30 sec
*/
- public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -216,20 +227,20 @@ public class TimelineMetricAggregatorFactory {
CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+ (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
- (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+ (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
int checkpointCutOffMultiplier =
- metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
String inputTableName = METRICS_RECORD_TABLE_NAME;
String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
- String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
- // Minute based aggregation have added responsibility of time slicing
- return new TimelineMetricClusterAggregatorMinute(
+ // Second based aggregation have added responsibility of time slicing
+ return new TimelineMetricClusterAggregatorSecond(
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -243,7 +254,56 @@ public class TimelineMetricAggregatorFactory {
}
/**
+ * Minute aggregation for cluster.
+ * Interval : 5 mins
+ */
+ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
+
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+ String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+ if (useGroupByAggregator(metricsConf)) {
+ return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l
+ );
+ }
+
+ return new TimelineMetricClusterAggregator(
+ hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ aggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l
+ );
+ }
+
+ /**
* Hourly aggregation for cluster.
+ * Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -291,6 +351,7 @@ public class TimelineMetricAggregatorFactory {
/**
* Daily aggregation for cluster.
+ * Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
deleted file mode 100644
index 85bdbbc..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
-public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class);
- public Long timeSliceIntervalMillis;
- private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
- // Aggregator to perform app-level aggregates for host metrics
- private final TimelineMetricAppAggregator appAggregator;
- // 1 minute client side buffering adjustment
- private final Long serverTimeShiftAdjustment;
-
- public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf,
- String checkpointLocation,
- Long sleepIntervalMillis,
- Integer checkpointCutOffMultiplier,
- String aggregatorDisabledParam,
- String tableName,
- String outputTableName,
- Long nativeTimeRangeDelay,
- Long timeSliceInterval) {
- super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
- checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
- outputTableName, nativeTimeRangeDelay);
-
- appAggregator = new TimelineMetricAppAggregator(metricsConf);
- this.timeSliceIntervalMillis = timeSliceInterval;
- this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
- }
-
- @Override
- protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
- // Account for time shift due to client side buffering by shifting the
- // timestamps with the difference between server time and series start time
- List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
- // Initialize app aggregates for host metrics
- appAggregator.init();
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
- aggregateMetricsFromResultSet(rs, timeSlices);
-
- LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
- hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
- appAggregator.cleanup();
- }
-
- @Override
- protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
- endTime, null, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_SQL,
- PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
- METRICS_RECORD_TABLE_NAME));
- // Retaining order of the row-key avoids client side merge sort.
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("SERVER_TIME");
- condition.addOrderByColumn("APP_ID");
- return condition;
- }
-
- /**
- * Return time slices to normalize the timeseries data.
- */
- private List<Long[]> getTimeSlices(long startTime, long endTime) {
- List<Long[]> timeSlices = new ArrayList<Long[]>();
- long sliceStartTime = startTime;
- while (sliceStartTime < endTime) {
- timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
- sliceStartTime += timeSliceIntervalMillis;
- }
- return timeSlices;
- }
-
- private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
- throws SQLException, IOException {
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
- TimelineMetric metric = null;
- if (rs.next()) {
- metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
- // Call slice after all rows for a host are read
- while (rs.next()) {
- TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
- // If rows belong to same host combine them before slicing. This
- // avoids issues across rows that belong to same hosts but get
- // counted as coming from different ones.
- if (metric.equalsExceptTime(nextMetric)) {
- metric.addMetricValues(nextMetric.getMetricValues());
- } else {
- // Process the current metric
- processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
- metric = nextMetric;
- }
- }
- }
- // Process last metric
- if (metric != null) {
- processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
- }
-
- // Add app level aggregates to save
- aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
- return aggregateClusterMetrics;
- }
-
- /**
- * Slice metric values into interval specified by :
- * timeline.metrics.cluster.aggregator.minute.timeslice.interval
- * Normalize value by averaging them within the interval
- */
- private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
- TimelineMetric metric, List<Long[]> timeSlices) {
- // Create time slices
- Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
-
- if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
-
- TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
- Double avgValue = clusterMetricEntry.getValue();
-
- MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
- if (aggregate == null) {
- aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
- aggregateClusterMetrics.put(clusterMetric, aggregate);
- } else {
- aggregate.updateSum(avgValue);
- aggregate.updateNumberOfHosts(1);
- aggregate.updateMax(avgValue);
- aggregate.updateMin(avgValue);
- }
- // Update app level aggregates
- appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
- }
- }
- }
-
- private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
- TimelineMetric timelineMetric, List<Long[]> timeSlices) {
-
- if (timelineMetric.getMetricValues().isEmpty()) {
- return null;
- }
-
- Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
- new HashMap<TimelineClusterMetric, Double>();
-
- Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
- if (timeShift < 0) {
- LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
- "timeShift = " + timeShift);
- timeShift = 0l;
- }
-
- for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
- // TODO: investigate null values - pre filter
- if (metric.getValue() == null) {
- continue;
- }
-
- Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
- if (timestamp != -1) {
- // Metric is within desired time range
- TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
- timelineMetric.getMetricName(),
- timelineMetric.getAppId(),
- timelineMetric.getInstanceId(),
- timestamp,
- timelineMetric.getType());
-
- // do a sum / count here to get average for all points in a slice
- int count = 1;
- Double sum;
- if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
- sum = metric.getValue();
- } else {
- count++;
- Double oldValue = timelineClusterMetricMap.get(clusterMetric);
- sum = oldValue + metric.getValue();
- }
- timelineClusterMetricMap.put(clusterMetric, (sum / count));
- }
- }
-
- return timelineClusterMetricMap;
- }
-
- /**
- * Return beginning of the time slice into which the metric fits.
- */
- private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
- for (Long[] timeSlice : timeSlices) {
- if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
- return timeSlice[0];
- }
- }
- return -1l;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
new file mode 100644
index 0000000..1c7bf7f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class);
+ public Long timeSliceIntervalMillis;
+ private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+ // Aggregator to perform app-level aggregates for host metrics
+ private final TimelineMetricAppAggregator appAggregator;
+ // 1 minute client side buffering adjustment
+ private final Long serverTimeShiftAdjustment;
+
+ public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String aggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay,
+ Long timeSliceInterval) {
+ super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+ checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
+ outputTableName, nativeTimeRangeDelay);
+
+ appAggregator = new TimelineMetricAppAggregator(metricsConf);
+ this.timeSliceIntervalMillis = timeSliceInterval;
+ this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
+ // Account for time shift due to client side buffering by shifting the
+ // timestamps with the difference between server time and series start time
+ List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
+ // Initialize app aggregates for host metrics
+ appAggregator.init();
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+ appAggregator.cleanup();
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_RECORD_TABLE_NAME));
+ // Retaining order of the row-key avoids client side merge sort.
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("SERVER_TIME");
+ condition.addOrderByColumn("APP_ID");
+ return condition;
+ }
+
+ /**
+ * Return time slices to normalize the timeseries data.
+ */
+ private List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ TimelineMetric metric = null;
+ if (rs.next()) {
+ metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+ // Call slice after all rows for a host are read
+ while (rs.next()) {
+ TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+ // If rows belong to same host combine them before slicing. This
+ // avoids issues across rows that belong to same hosts but get
+ // counted as coming from different ones.
+ if (metric.equalsExceptTime(nextMetric)) {
+ metric.addMetricValues(nextMetric.getMetricValues());
+ } else {
+ // Process the current metric
+ processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ metric = nextMetric;
+ }
+ }
+ }
+ // Process last metric
+ if (metric != null) {
+ processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ }
+
+ // Add app level aggregates to save
+ aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+ return aggregateClusterMetrics;
+ }
+
+ /**
+ * Slice metric values into interval specified by :
+ * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+ * Normalize value by averaging them within the interval
+ */
+ private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ TimelineMetric metric, List<Long[]> timeSlices) {
+ // Create time slices
+ Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ Double avgValue = clusterMetricEntry.getValue();
+
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ // Update app level aggregates
+ appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
+ }
+ }
+ }
+
+ private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+ if (timelineMetric.getMetricValues().isEmpty()) {
+ return null;
+ }
+
+ Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+ new HashMap<TimelineClusterMetric, Double>();
+
+ Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
+ if (timeShift < 0) {
+ LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
+ "timeShift = " + timeShift);
+ timeShift = 0l;
+ }
+
+ for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+ // TODO: investigate null values - pre filter
+ if (metric.getValue() == null) {
+ continue;
+ }
+
+ Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
+ if (timestamp != -1) {
+ // Metric is within desired time range
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timestamp,
+ timelineMetric.getType());
+
+ // do a sum / count here to get average for all points in a slice
+ int count = 1;
+ Double sum;
+ if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+ sum = metric.getValue();
+ } else {
+ count++;
+ Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+ sum = oldValue + metric.getValue();
+ }
+ timelineClusterMetricMap.put(clusterMetric, (sum / count));
+ }
+ }
+
+ return timelineClusterMetricMap;
+ }
+
+ /**
+ * Return beginning of the time slice into which the metric fits.
+ */
+ private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+ for (Long[] timeSlice : timeSlices) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+ return timeSlice[0];
+ }
+ }
+ return -1l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 092c983..92d59e2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -86,7 +86,7 @@ public class PhoenixTransactSQL {
"TTL=%s, COMPRESSION='%s'";
// HOSTS_COUNT vs METRIC_COUNT
- public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS %s " +
"(METRIC_NAME VARCHAR, " +
"APP_ID VARCHAR, " +
@@ -248,6 +248,8 @@ public class PhoenixTransactSQL {
"METRIC_RECORD_DAILY";
public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
"METRIC_AGGREGATE";
+ public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME =
+ "METRIC_AGGREGATE_MINUTE";
public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
"METRIC_AGGREGATE_HOURLY";
public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
@@ -555,7 +557,10 @@ public class PhoenixTransactSQL {
metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
break;
- //TODO : Include MINUTE case after introducing CLUSTER_AGGREGATOR_MINUTE
+ case MINUTES:
+ metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+ queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+ break;
default:
metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
queryStmt = GET_CLUSTER_AGGREGATE_SQL;
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index cbf0233..4ddecdc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -42,15 +42,14 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
@@ -101,7 +100,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testShouldAggregateClusterProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -111,7 +110,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
"disk_free", 1));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
"disk_free", 2));
- ctime += minute;
+ ctime += 2*minute;
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 2));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -153,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testShouldAggregateClusterIgnoringInstance() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -214,21 +213,21 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
if ("disk_free".equals(currentMetric.getMetricName())) {
System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
assertEquals(2, currentHostAggregate.getNumberOfHosts());
- assertEquals(5.0, currentHostAggregate.getSum());
+ assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
recordCount++;
} else {
fail("Unexpected entry");
}
}
- Assert.assertEquals(8, recordCount);
+ Assert.assertEquals(5, recordCount);
}
@Test
public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// here we put some metrics tha will be aggregated
@@ -242,7 +241,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
"disk_used", 1));
- ctime += minute;
+ ctime += 2*minute;
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
"disk_free", 2));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -334,6 +333,73 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
}
@Test
+ public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
+
+ TimelineMetricAggregator agg =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long second = 1000;
+ long minute = 60*second;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineClusterMetric(ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+ hdb.saveClusterAggregateRecords(records);
+ agg.doWork(startTime, ctime + second);
+ long oldCtime = ctime + second;
+
+ //Next minute
+ ctime = startTime + minute;
+
+ records.put(createEmptyTimelineClusterMetric(ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineClusterMetric(ctime += second),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+ hdb.saveClusterAggregateRecords(records);
+ agg.doWork(oldCtime, ctime + second);
+
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
+ int count = 0;
+ long diff = 0 ;
+ while (rs.next()) {
+ assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ if (count == 0) {
+ diff+=rs.getLong("SERVER_TIME");
+ } else {
+ diff-=rs.getLong("SERVER_TIME");
+ if (diff < 0) {
+ diff*=-1;
+ }
+ assertTrue(diff == minute);
+ }
+ count++;
+ }
+
+ assertEquals("One hourly aggregated row expected ", 2, count);
+ }
+
+ @Test
public void testShouldAggregateClusterOnHourProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
@@ -444,7 +510,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
Configuration conf = getConfigurationForTest(false);
conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -483,7 +549,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
recordCount++;
}
- assertEquals(4, recordCount);
+ assertEquals(3, recordCount);
assertNotNull(currentMetric);
assertEquals("cpu_user", currentMetric.getMetricName());
assertEquals("app1", currentMetric.getAppId());
@@ -495,7 +561,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
@Test
public void testClusterAggregateMetricNormalization() throws Exception {
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// Sample data
@@ -565,7 +631,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
fail("Unexpected entry");
}
}
- Assert.assertEquals(9, recordCount);
+ Assert.assertEquals(5, recordCount);
}
@Test
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 89fee7c..5e7234c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -205,7 +204,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetClusterMetricRecordsSeconds() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
@@ -236,7 +235,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
TimelineMetric metric = timelineMetrics.getMetrics().get(0);
assertEquals("disk_free", metric.getMetricName());
- assertEquals(8, metric.getMetricValues().size());
+ assertEquals(5, metric.getMetricValues().size());
assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 0.00001);
}
@@ -244,7 +243,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
index 7c7db9f..9100afd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.TreeMap;
@Singleton
@@ -250,24 +251,16 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
Long requestedStartTime, Long requestedEndTime, boolean removeAll) {
for (TimelineMetric existingMetric : existingMetrics.getMetrics()) {
- if(removeAll) {
+ if (removeAll) {
existingMetric.setMetricValues(new TreeMap<Long, Double>());
} else {
- Map<Long, Double> existingMetricValues = existingMetric.getMetricValues();
+ TreeMap<Long, Double> existingMetricValues = existingMetric.getMetricValues();
LOG.trace("Existing metric: " + existingMetric.getMetricName() +
" # " + existingMetricValues.size());
- Iterator<Map.Entry<Long, Double>> valueIterator = existingMetricValues.entrySet().iterator();
-
- // Remove old values
- // Assumption: All return value are millis
- while (valueIterator.hasNext()) {
- Map.Entry<Long, Double> metricEntry = valueIterator.next();
- if (metricEntry.getKey() < requestedStartTime
- || metricEntry.getKey() > requestedEndTime) {
- valueIterator.remove();
- }
- }
+ // Retain only the values that are within the [requestStartTime, requestedEndTime] window
+ existingMetricValues.headMap(requestedStartTime,false).clear();
+ existingMetricValues.tailMap(requestedEndTime, false).clear();
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 8f1a481..5d1e934 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -85,6 +85,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
private static final String KAFKA_BROKER = "kafka-broker";
private static final String AMS_ENV = "ams-env";
private static final String AMS_HBASE_ENV = "ams-hbase-env";
+ private static final String AMS_SITE = "ams-site";
private static final String HBASE_ENV_CONFIG = "hbase-env";
private static final String HIVE_SITE_CONFIG = "hive-site";
private static final String RANGER_ENV_CONFIG = "ranger-env";
@@ -647,7 +648,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
if (hostDiskUsageAlertDefinitionEntity != null) {
hostDiskUsageAlertDefinitionEntity.setDescription("This host-level alert is triggered if the amount of disk " +
- "space used goes above specific thresholds. The default threshold values are 50% for WARNING and 80% for CRITICAL");
+ "space used goes above specific thresholds. The default threshold values are 50% for WARNING and 80% for CRITICAL");
hostDiskUsageAlertDefinitionEntity.setLabel("Host Disk Usage");
alertDefinitionDAO.merge(hostDiskUsageAlertDefinitionEntity);
@@ -832,6 +833,27 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
newProperties.put("content", updateAmsHbaseEnvContent(content));
updateConfigurationPropertiesForCluster(cluster, AMS_HBASE_ENV, newProperties, true, true);
}
+ Config amsSite = cluster.getDesiredConfigByType(AMS_SITE);
+ if (amsSite != null) {
+ Map<String, String> newProperties = new HashMap<>();
+
+ //Interval
+ newProperties.put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+ newProperties.put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+ newProperties.put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+
+ //ttl
+ newProperties.put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+ newProperties.put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+
+ //checkpoint
+ newProperties.put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+
+ //disabled
+ newProperties.put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+
+ updateConfigurationPropertiesForCluster(cluster, AMS_SITE, newProperties, true, true);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 89b584b..c73a401 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -59,11 +59,11 @@
</property>
<property>
<name>timeline.metrics.host.aggregator.minute.interval</name>
- <value>120</value>
+ <value>300</value>
<display-name>Minute host aggregator interval</display-name>
<description>
Time in seconds to sleep for the minute resolution host based
- aggregator. Default resolution is 2 minutes.
+ aggregator. Default resolution is 5 minutes.
</description>
<value-attributes>
<type>int</type>
@@ -111,10 +111,22 @@
</property>
<property>
<name>timeline.metrics.cluster.aggregator.minute.interval</name>
- <value>120</value>
+ <value>300</value>
<display-name>Minute cluster aggregator interval</display-name>
<description>
Time in seconds to sleep for the minute resolution cluster wide
+ aggregator. Default resolution is 5 minutes.
+ </description>
+ <value-attributes>
+ <type>int</type>
+ </value-attributes>
+ </property>
+ <property>
+ <name>timeline.metrics.cluster.aggregator.second.interval</name>
+ <value>120</value>
+ <display-name>Second cluster aggregator interval</display-name>
+ <description>
+ Time in seconds to sleep for the second resolution cluster wide
aggregator. Default resolution is 2 minutes.
</description>
<value-attributes>
@@ -170,6 +182,19 @@
</value-attributes>
</property>
<property>
+ <name>timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier</name>
+ <value>2</value>
+ <display-name>Second cluster aggregator checkpoint cutOff multiplier</display-name>
+ <description>
+ Multiplier value * interval = Max allowed checkpoint lag. Effectively
+ if aggregator checkpoint is greater than max allowed checkpoint delay,
+ the checkpoint will be discarded by the aggregator.
+ </description>
+ <value-attributes>
+ <type>int</type>
+ </value-attributes>
+ </property>
+ <property>
<name>timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier</name>
<value>2</value>
<display-name>Minute cluster aggregator checkpoint cutOff multiplier</display-name>
@@ -238,11 +263,19 @@
</description>
</property>
<property>
- <name>timeline.metrics.cluster.aggregator.minute.timeslice.interval</name>
+ <name>timeline.metrics.cluster.aggregator.second.disabled</name>
+ <value>false</value>
+ <display-name>Disable second cluster aggregator</display-name>
+ <description>
+ Disable cluster based second aggregations.
+ </description>
+ </property>
+ <property>
+ <name>timeline.metrics.cluster.aggregator.second.timeslice.interval</name>
<value>30</value>
- <display-name>Minute cluster aggregator timeslice interval</display-name>
+ <display-name>Second cluster aggregator timeslice interval</display-name>
<description>
- Lowest resolution of desired data for cluster level minute aggregates.
+ Lowest resolution of desired data for cluster level second aggregates.
</description>
<value-attributes>
<type>int</type>
@@ -270,9 +303,16 @@
</description>
</property>
<property>
- <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+ <name>timeline.metrics.cluster.aggregator.second.ttl</name>
<value>2592000</value>
<description>
+ Cluster wide second resolution data purge interval. Default is 7 days.
+ </description>
+ </property>
+ <property>
+ <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+ <value>7776000</value>
+ <description>
Cluster wide minute resolution data purge interval. Default is 30 days.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
index cd9c844..fa4deaf 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
@@ -27,7 +27,7 @@ import ast
metric_filename_ext = '.txt'
# 5 regions for higher order aggregate tables
-other_region_static_count = 5
+other_region_static_count = 6
# Max equidistant points to return per service
max_equidistant_points = 50
http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index eb4d677..64b7af0 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -18,6 +18,8 @@
package org.apache.ambari.server.upgrade;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Guice;
@@ -26,9 +28,15 @@ import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.persist.PersistService;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
+import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.ConfigurationResponse;
+import org.apache.ambari.server.controller.KerberosHelper;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
import org.apache.ambari.server.orm.DBAccessor;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -78,6 +86,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@@ -91,6 +100,8 @@ import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* {@link org.apache.ambari.server.upgrade.UpgradeCatalog213} unit tests.
@@ -667,6 +678,71 @@ public class UpgradeCatalog213Test {
}
@Test
+ public void testAmsSiteUpdateConfigs() throws Exception{
+
+ Map<String, String> oldPropertiesAmsSite = new HashMap<String, String>() {
+ {
+ //Including only those properties that might be present in an older version.
+ put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(1000));
+ put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(1000));
+ put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(1000));
+ }
+ };
+ Map<String, String> newPropertiesAmsSite = new HashMap<String, String>() {
+ {
+ put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+ put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+ put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+ put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+ put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+ put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+ put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+ }
+ };
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+
+ Clusters clusters = easyMockSupport.createNiceMock(Clusters.class);
+ final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class);
+ Config mockAmsSite = easyMockSupport.createNiceMock(Config.class);
+
+ expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+ put("normal", cluster);
+ }}).once();
+ expect(cluster.getDesiredConfigByType("ams-site")).andReturn(mockAmsSite).atLeastOnce();
+ expect(mockAmsSite.getProperties()).andReturn(oldPropertiesAmsSite).times(1);
+
+ Injector injector = easyMockSupport.createNiceMock(Injector.class);
+ expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes();
+ expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes();
+ expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes();
+
+ replay(injector, clusters, mockAmsSite, cluster);
+
+ AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class)
+ .addMockedMethod("createConfiguration")
+ .addMockedMethod("getClusters", new Class[] { })
+ .withConstructor(createNiceMock(ActionManager.class), clusters, injector)
+ .createNiceMock();
+
+ Injector injector2 = easyMockSupport.createNiceMock(Injector.class);
+ Capture<ConfigurationRequest> configurationRequestCapture = EasyMock.newCapture();
+ ConfigurationResponse configurationResponseMock = easyMockSupport.createMock(ConfigurationResponse.class);
+
+ expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes();
+ expect(controller.getClusters()).andReturn(clusters).anyTimes();
+ expect(controller.createConfiguration(capture(configurationRequestCapture))).andReturn(configurationResponseMock).once();
+
+ replay(controller, injector2, configurationResponseMock);
+ new UpgradeCatalog213(injector2).updateAMSConfigs();
+ easyMockSupport.verifyAll();
+
+ ConfigurationRequest configurationRequest = configurationRequestCapture.getValue();
+ Map<String, String> updatedProperties = configurationRequest.getProperties();
+ assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual());
+
+ }
+
+ @Test
public void testUpdateAlertDefinitions() {
EasyMockSupport easyMockSupport = new EasyMockSupport();
UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);