You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/14 20:10:22 UTC
ambari git commit: AMBARI-5707. Missing Implementation of
TimelineMetricClusterAggregatorHourly.
Repository: ambari
Updated Branches:
refs/heads/branch-metrics-dev e5542aa5f -> 3c23bee67
AMBARI-5707. Missing Implementation of TimelineMetricClusterAggregatorHourly.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3c23bee6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3c23bee6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3c23bee6
Branch: refs/heads/branch-metrics-dev
Commit: 3c23bee67d0391dba57bf38f8a2e4c80fdec6286
Parents: e5542aa
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Nov 14 11:09:39 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Nov 14 11:09:39 2014 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineAggregator.java | 50 +++-
.../timeline/HBaseTimelineMetricStore.java | 1 +
.../metrics/timeline/PhoenixHBaseAccessor.java | 129 ++++++++-
.../metrics/timeline/PhoenixTransactSQL.java | 28 +-
.../TimelineMetricAggregatorHourly.java | 128 +++++----
.../TimelineMetricClusterAggregator.java | 45 ++-
.../TimelineMetricClusterAggregatorHourly.java | 172 ++++++++++--
.../metrics/timeline/TestClusterAggregator.java | 275 +++++++++++++++++++
.../metrics/timeline/TestHBaseAccessor.java | 114 +++++---
9 files changed, 800 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index a13b591..6bd6507 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -32,8 +33,10 @@ import java.io.File;
import java.io.IOException;
import java.util.Date;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
public abstract class AbstractTimelineAggregator implements Runnable {
protected final PhoenixHBaseAccessor hBaseAccessor;
@@ -51,7 +54,8 @@ public abstract class AbstractTimelineAggregator implements Runnable {
Configuration metricsConf) {
this.hBaseAccessor = hBaseAccessor;
this.metricsConf = metricsConf;
- this.checkpointDelay = metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120000);
+ this.checkpointDelay = metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY,
+ 120000);
this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
this.LOG = LogFactory.getLog(this.getClass());
}
@@ -86,10 +90,11 @@ public abstract class AbstractTimelineAggregator implements Runnable {
if (lastCheckPointTime != -1) {
LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+ ((System.currentTimeMillis() - lastCheckPointTime) / 1000)
- + " seconds." );
+ + " seconds.");
long startTime = System.currentTimeMillis();
- boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+ boolean success = doWork(lastCheckPointTime,
+ lastCheckPointTime + SLEEP_INTERVAL);
long executionTime = System.currentTimeMillis() - startTime;
long delta = SLEEP_INTERVAL - executionTime;
@@ -125,7 +130,8 @@ public abstract class AbstractTimelineAggregator implements Runnable {
private boolean isLastCheckPointTooOld(long checkpoint) {
return checkpoint != -1 &&
- ((System.currentTimeMillis() - checkpoint) > getCheckpointCutOffInterval());
+ ((System.currentTimeMillis() - checkpoint) >
+ getCheckpointCutOffInterval());
}
private long readCheckPoint() {
@@ -170,8 +176,8 @@ public abstract class AbstractTimelineAggregator implements Runnable {
protected abstract String getCheckpointLocation();
- @JsonSubTypes({ @JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class) })
+ @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
@InterfaceAudience.Public
@InterfaceStability.Unstable
public static class MetricAggregate {
@@ -180,9 +186,11 @@ public abstract class AbstractTimelineAggregator implements Runnable {
protected Double max = Double.MIN_VALUE;
protected Double min = Double.MAX_VALUE;
- public MetricAggregate() {}
+ public MetricAggregate() {
+ }
- protected MetricAggregate(Double sum, Double deviation, Double max, Double min) {
+ protected MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
this.sum = sum;
this.deviation = deviation;
this.max = max;
@@ -250,7 +258,8 @@ public abstract class AbstractTimelineAggregator implements Runnable {
private int numberOfHosts;
@JsonCreator
- public MetricClusterAggregate() {}
+ public MetricClusterAggregate() {
+ }
MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
Double max, Double min) {
@@ -271,6 +280,16 @@ public abstract class AbstractTimelineAggregator implements Runnable {
this.numberOfHosts = numberOfHosts;
}
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
@Override
public String toString() {
// MetricClusterAggregate
@@ -297,6 +316,13 @@ public abstract class AbstractTimelineAggregator implements Runnable {
super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
}
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
@JsonProperty("numberOfSamples")
long getNumberOfSamples() {
return numberOfSamples == 0 ? 1 : numberOfSamples;
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index aae8555..d2b96ec 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -121,6 +121,7 @@ public class HBaseTimelineMetricStore extends AbstractService
super.serviceStop();
}
+ //TODO: update to work with HOSTS_COUNT and METRIC_COUNT
@Override
public TimelineMetrics getTimelineMetrics(List<String> metricNames,
String hostname, String applicationId, String instanceId,
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 2d53a0b..0851d07 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -28,7 +28,6 @@ import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -37,7 +36,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricClusterAggregate;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
@@ -50,6 +51,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -163,6 +165,32 @@ public class PhoenixHBaseAccessor {
return metricHostAggregate;
}
+ static TimelineClusterMetric
+ getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineClusterMetric metric = new TimelineClusterMetric(
+ rs.getString("METRIC_NAME"),
+ rs.getString("APP_ID"),
+ rs.getString("INSTANCE_ID"),
+ rs.getLong("SERVER_TIME"),
+ rs.getString("UNITS"));
+
+ return metric;
+ }
+
+ static MetricClusterAggregate
+ getMetricClusterAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricClusterAggregate agg = new MetricClusterAggregate();
+ agg.setSum(rs.getDouble("METRIC_SUM"));
+ agg.setMax(rs.getDouble("METRIC_MAX"));
+ agg.setMin(rs.getDouble("METRIC_MIN"));
+ agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+ agg.setDeviation(0.0);
+
+ return agg;
+ }
protected void initMetricSchema() {
Connection conn = getConnection();
@@ -429,8 +457,90 @@ public class PhoenixHBaseAccessor {
*
* @throws SQLException
*/
- public void saveClusterAggregateRecords(Map<TimelineClusterMetric,
- MetricClusterAggregate> records) throws SQLException {
+ public void saveClusterAggregateRecords(
+ Map<TimelineClusterMetric, MetricClusterAggregate> records)
+ throws SQLException {
+
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Empty aggregate records.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ int rowCount = 0;
+
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateEntry : records.entrySet()) {
+ TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, clusterMetric.getMetricName());
+ stmt.setString(2, clusterMetric.getAppId());
+ stmt.setString(3, clusterMetric.getInstanceId());
+ stmt.setLong(4, clusterMetric.getTimestamp());
+ stmt.setString(5, clusterMetric.getType());
+ stmt.setDouble(6, aggregate.getSum());
+ stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setDouble(8, aggregate.getMax());
+ stmt.setDouble(9, aggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ // TODO: Why this exception is swallowed
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ long end = System.currentTimeMillis();
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getName());
+ }
+ }
+
+ /**
+ * Save Metric aggregate records.
+ *
+ * @throws SQLException
+ */
+ public void saveClusterAggregateHourlyRecords(
+ Map<TimelineClusterMetric, MetricHostAggregate> records,
+ String tableName)
+ throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
@@ -441,13 +551,14 @@ public class PhoenixHBaseAccessor {
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ stmt = conn.prepareStatement(String.format
+ (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
int rowCount = 0;
- for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
aggregateEntry : records.entrySet()) {
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
- MetricClusterAggregate aggregate = aggregateEntry.getValue();
+ MetricHostAggregate aggregate = aggregateEntry.getValue();
LOG.trace("clusterMetric = " + clusterMetric + ", " +
"aggregate = " + aggregate);
@@ -460,13 +571,15 @@ public class PhoenixHBaseAccessor {
stmt.setLong(4, clusterMetric.getTimestamp());
stmt.setString(5, clusterMetric.getType());
stmt.setDouble(6, aggregate.getSum());
- stmt.setInt(7, aggregate.getNumberOfHosts());
+// stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setLong(7, aggregate.getNumberOfSamples());
stmt.setDouble(8, aggregate.getMax());
stmt.setDouble(9, aggregate.getMin());
try {
stmt.executeUpdate();
} catch (SQLException sql) {
+ // we have no way to verify it works!!!
LOG.error(sql);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
index 8264c41..ed8f978 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -109,7 +109,8 @@ public class PhoenixTransactSQL {
"INSTANCE_ID VARCHAR, " +
"SERVER_TIME UNSIGNED_LONG NOT NULL, " +
"UNITS CHAR(20), " +
- "METRIC_AVG DOUBLE, " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
"METRIC_MAX DOUBLE, " +
"METRIC_MIN DOUBLE " +
"CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
@@ -138,6 +139,16 @@ public class PhoenixTransactSQL {
"METRIC_MIN) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+ " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
"%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
"SERVER_TIME, " +
@@ -172,17 +183,32 @@ public class PhoenixTransactSQL {
public static final String GET_CLUSTER_AGGREGATE_SQL =
"SELECT METRIC_NAME, APP_ID, " +
"INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
"METRIC_SUM, " +
"HOSTS_COUNT, " +
"METRIC_MAX, " +
"METRIC_MIN " +
"FROM METRIC_AGGREGATE";
+ public static final String GET_CLUSTER_AGGREGATE_TIME_SQL =
+ "SELECT METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM METRIC_AGGREGATE";
+
public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
"METRIC_RECORD_MINUTE";
public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
"METRIC_RECORD_HOURLY";
+ public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+ "METRIC_AGGREGATE";
+ public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_AGGREGATE_HOURLY";
public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
public static final String DEFAULT_ENCODING = "FAST_DIFF";
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
index 5569422..6a84024 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
@@ -16,13 +16,15 @@
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -31,18 +33,25 @@ import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration
+ .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration
+ .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorHourly.class);
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricAggregatorHourly.class);
private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
"timeline-metrics-host-aggregator-hourly-checkpoint";
private final String checkpointLocation;
@@ -60,7 +69,8 @@ public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
checkpointLocation = FilenameUtils.concat(checkpointDir,
MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
- sleepInterval = metricsConf.getLong(HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+ sleepInterval = metricsConf.getLong(HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL,
+ 3600000l);
checkpointCutOffMultiplier =
metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
}
@@ -75,56 +85,18 @@ public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
LOG.info("Start aggregation cycle @ " + new Date());
boolean success = true;
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
+ Condition condition = prepareMetricQueryCondition(startTime, endTime);
Connection conn = null;
PreparedStatement stmt = null;
try {
conn = hBaseAccessor.getConnection();
- stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+ stmt = prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = stmt.executeQuery();
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
-
- } else {
- // Switched over to a new metric - save existing
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
+ aggregateMetricsFromResultSet(rs);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
@@ -158,6 +130,56 @@ public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
return success;
}
+ private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
@Override
protected Long getSleepInterval() {
return sleepInterval;
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index bdc3b53..080703e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -46,7 +46,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
/**
- * Aggregates a metric across all hosts in the cluster.
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
*/
public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
@@ -90,16 +91,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
"startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
boolean success = true;
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setFetchSize(resultsetFetchSize);
- condition.setNoLimit();
- condition.setStatement(String.format(GET_METRIC_SQL,
- METRICS_RECORD_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
+ Condition condition = prepareMetricQueryCondition(startTime, endTime);
Connection conn;
PreparedStatement stmt;
@@ -164,6 +156,21 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return success;
}
+ private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setNoLimit();
+ condition.setStatement(String.format(GET_METRIC_SQL,
+ METRICS_RECORD_TABLE_NAME));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+
+ return condition;
+ }
+
@Override
protected Long getSleepInterval() {
return sleepInterval;
@@ -199,9 +206,11 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
if (timestamp != -1) {
// Metric is within desired time range
TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
- timelineMetric.getMetricName(), timelineMetric.getAppId(),
- timelineMetric.getInstanceId(), timestamp, timelineMetric.getType());
-
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timestamp,
+ timelineMetric.getType());
if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
timelineClusterMetricMap.put(clusterMetric, metric.getValue());
} else {
@@ -278,6 +287,14 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return true;
}
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
@Override
public int hashCode() {
int result = metricName.hashCode();
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index 1d70318..c380adb 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -15,29 +15,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
-public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorHourly.class);
- private final long sleepInterval;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration
+ .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration
+ .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricClusterAggregatorHourly extends
+ AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricClusterAggregatorHourly.class);
private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
"timeline-metrics-cluster-aggregator-hourly-checkpoint";
private final String checkpointLocation;
+ private final long sleepInterval;
private final Integer checkpointCutOffMultiplier;
+ private long checkpointCutOffInterval;
- public TimelineMetricClusterAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
+ public TimelineMetricClusterAggregatorHourly(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
super(hBaseAccessor, metricsConf);
String checkpointDir = metricsConf.get(
@@ -46,9 +74,11 @@ public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggre
checkpointLocation = FilenameUtils.concat(checkpointDir,
CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
- sleepInterval = metricsConf.getLong(CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
- checkpointCutOffMultiplier =
- metricsConf.getInt(CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ sleepInterval = metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+ checkpointCutOffInterval = 7200000l;
+ checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
}
@Override
@@ -58,7 +88,117 @@ public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggre
@Override
protected boolean doWork(long startTime, long endTime) {
- return false;
+ LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+ "startTime = " + new Date(startTime) + ", endTime = " + new Date
+ (endTime));
+
+ boolean success = true;
+ Condition condition = prepareMetricQueryCondition(startTime, endTime);
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = prepareGetMetricsSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+
+ hBaseAccessor.saveClusterAggregateHourlyRecords(
+ hostAggregateMap,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ LOG.info("End aggregation cycle @ " + new Date());
+ return success;
+ }
+
+ // should rewrite from host agg to cluster agg
+ //
+ private Map<TimelineClusterMetric, MetricHostAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
+
+ TimelineClusterMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ getMetricClusterAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+
+ }
+
+ return hostAggregateMap;
+ }
+
+ private void updateAggregatesFromHost(
+ MetricHostAggregate agg,
+ MetricClusterAggregate currentClusterAggregate) {
+ agg.updateMax(currentClusterAggregate.getMax());
+ agg.updateMin(currentClusterAggregate.getMin());
+ agg.updateSum(currentClusterAggregate.getSum());
+ agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+ }
+
+ private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition
+ (null, null, null, null, startTime, endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
}
@Override
@@ -73,7 +213,7 @@ public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggre
@Override
protected Long getCheckpointCutOffInterval() {
- return 7200000l;
+ return checkpointCutOffInterval;
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
new file mode 100644
index 0000000..ac14ca3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
@@ -0,0 +1,275 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.AbstractTimelineAggregator.MetricHostAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestClusterAggregator {
+ private static String MY_LOCAL_URL =
+ "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ hdb.initMetricSchema();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+
+ stmt.close();
+ conn.close();
+ }
+
+ /**
+ * A canary test.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testClusterOK() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', " +
+ "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+ stmt.executeUpdate(sampleDDL);
+ ResultSet rs = stmt.executeQuery(
+ "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+ rs.next();
+ long l = rs.getLong(1);
+ assertThat(l).isGreaterThanOrEqualTo(0);
+
+ stmt.execute("DROP TABLE TEST_METRICS");
+ }
+
+ @Test
+ public void testShouldAggregateClusterProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregator agg =
+ new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 2));
+ ctime += minute;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 1));
+
+ // WHEN
+ long endTime = ctime + minute;
+ boolean success = agg.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+ MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(3.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ }
+
+ @Test
+ public void testShouldAggregateClusterOnHourProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregatorHourly agg =
+ new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+ // this time can be virtualized! or made independent from real clock
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineMetric(ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+ hdb.saveClusterAggregateRecords(records);
+
+ // WHEN
+ agg.doWork(startTime, ctime + minute);
+
+ // THEN
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+ int count = 0;
+ while (rs.next()) {
+ assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ count++;
+ }
+
+ assertEquals("One hourly aggregated row expected ",1, count);
+
+ System.out.println(rs);
+
+
+ }
+
+ private ResultSet executeQuery(String query) throws SQLException {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ return stmt.executeQuery(query);
+ }
+
+ private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
+ TimelineClusterMetric metric = new TimelineClusterMetric("disk_used",
+ "test_app", null, startTime, null);
+
+ return metric;
+ }
+
+ private MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ MetricHostAggregate expectedAggregate =
+ new MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ new Configuration(),
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+ new Comparator<TimelineMetric>() {
+ @Override
+ public int compare(TimelineMetric o1, TimelineMetric o2) {
+ return o1.equalsExceptTime(o2) ? 0 : 1;
+ }
+ };
+
+ private TimelineMetrics prepareSingleTimelineMetric(long startTime,
+ String host,
+ double val) {
+ TimelineMetrics m = new TimelineMetrics();
+ m.setMetrics(Arrays.asList(
+ createTimelineMetric(startTime, "disk_free", host, val)));
+
+ return m;
+ }
+
+ private TimelineMetric createTimelineMetric(long startTime,
+ String metricName,
+ String host,
+ double val) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, val);
+ vals.put(startTime + 30000l, val);
+ vals.put(startTime + 45000l, val);
+ vals.put(startTime + 60000l, val);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+
+
+ protected static String getUrl() {
+ return MY_LOCAL_URL;
+// return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+ }
+
+ private static Connection getConnection(String url) throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/3c23bee6/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
index 7958055..a10cfeb 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
@@ -4,9 +4,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
import java.sql.*;
import java.util.*;
@@ -16,23 +14,37 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.PhoenixTransactSQL.*;
import static org.assertj.core.api.Assertions.assertThat;
-@Ignore
public class TestHBaseAccessor {
private static String MY_LOCAL_URL =
"jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ hdb.initMetricSchema();
+ }
@After
public void tearDown() throws Exception {
Connection conn = getConnection(getUrl());
Statement stmt = conn.createStatement();
- stmt.execute("DROP TABLE METRIC_AGGREGATE");
- stmt.execute("DROP TABLE METRIC_AGGREGATE_HOURLY");
- stmt.execute("DROP TABLE METRIC_RECORD");
- stmt.execute("DROP TABLE METRIC_RECORD_HOURLY");
- stmt.execute("DROP TABLE METRIC_RECORD_MINUTE");
- }
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+ stmt.close();
+ conn.close();
+ }
/**
* A canary test.
*
@@ -58,19 +70,18 @@ public class TestHBaseAccessor {
stmt.execute("DROP TABLE TEST_METRICS");
}
- @Test
+ // @Test
public void testShouldInsertMetrics() throws Exception {
// GIVEN
- PhoenixHBaseAccessor sut = createTestableHBaseAccessor();
// WHEN
long startTime = System.currentTimeMillis();
- TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
- sut.insertMetricRecords(metricsSent);
+ TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+ hdb.insertMetricRecords(metricsSent);
Condition queryCondition = new Condition(null, "local", null, null,
startTime, startTime + (15 * 60 * 1000), null, false);
- TimelineMetrics recordRead = sut.getMetricRecords(queryCondition);
+ TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
// THEN
assertThat(recordRead.getMetrics()).hasSize(2)
@@ -82,27 +93,24 @@ public class TestHBaseAccessor {
.containsExactlyElementsOf(recordRead.getMetrics());
}
- @Test
+ // @Test
public void testShouldAggregateMinuteProperly() throws Exception {
// GIVEN
- PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
- Connection conn = getConnection(getUrl());
- hdb.initMetricSchema();
TimelineMetricAggregatorMinute aggregatorMinute =
new TimelineMetricAggregatorMinute(hdb, new Configuration());
long startTime = System.currentTimeMillis();
- TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
- hdb.insertMetricRecords(metricsSent);
- hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 1 * 60 * 1000));
- hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 2 * 60 * 1000));
- hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 3 * 60 * 1000));
- hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 4 * 60 * 1000));
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
// WHEN
long endTime = startTime + 1000 * 60 * 4;
- boolean success = aggregatorMinute.doWork(startTime,
- endTime);
+ boolean success = aggregatorMinute.doWork(startTime, endTime);
//THEN
Condition condition = new Condition(null, null, null, null, startTime,
@@ -129,12 +137,14 @@ public class TestHBaseAccessor {
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
} else {
fail("Unexpected entry");
}
@@ -142,12 +152,9 @@ public class TestHBaseAccessor {
assertEquals("Two aggregated entries expected", 2, count);
}
- @Test
+ // @Test
public void testShouldAggregateHourProperly() throws Exception {
// GIVEN
- PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
- Connection conn = getConnection(getUrl());
- hdb.initMetricSchema();
TimelineMetricAggregatorHourly aggregator =
new TimelineMetricAggregatorHourly(hdb, new Configuration());
long startTime = System.currentTimeMillis();
@@ -256,20 +263,51 @@ public class TestHBaseAccessor {
}
};
- private TimelineMetrics prepareTimelineMetrics(long startTime) {
+ private TimelineMetrics prepareSingleTimelineMetric(long startTime,
+ String host,
+ double val) {
+ TimelineMetrics m = new TimelineMetrics();
+ m.setMetrics(Arrays.asList(
+ createTimelineMetric(startTime, "disk_free", host, val)));
+
+ return m;
+ }
+
+ private TimelineMetric createTimelineMetric(long startTime,
+ String metricName,
+ String host,
+ double val) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, val);
+ vals.put(startTime + 30000l, val);
+ vals.put(startTime + 45000l, val);
+ vals.put(startTime + 60000l, val);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+
+ private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
TimelineMetrics metrics = new TimelineMetrics();
- List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
- allMetrics.add(createMetric(startTime, "disk_free"));
- allMetrics.add(createMetric(startTime, "mem_free"));
- metrics.setMetrics(allMetrics);
+ metrics.setMetrics(Arrays.asList(
+ createMetric(startTime, "disk_free", host),
+ createMetric(startTime, "mem_free", host)));
return metrics;
}
- private TimelineMetric createMetric(long startTime, String metricName) {
+ private TimelineMetric createMetric(long startTime,
+ String metricName,
+ String host) {
TimelineMetric m = new TimelineMetric();
m.setAppId("host");
- m.setHostName("local");
+ m.setHostName(host);
m.setMetricName(metricName);
m.setStartTime(startTime);
Map<Long, Double> vals = new HashMap<Long, Double>();