You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/05/18 22:15:17 UTC
ambari git commit: AMBARI-11184. AMS: Incorrect value obtained for a
datapoint in the metric data series queried from AMS. (swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk 2015be59a -> 8b0c964a8
AMBARI-11184. AMS: Incorrect value obtained for a datapoint in the metric data series queried from AMS. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8b0c964a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8b0c964a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8b0c964a
Branch: refs/heads/trunk
Commit: 8b0c964a89ffd4d352d0deac0756923242a252e9
Parents: 2015be5
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Mon May 18 13:07:06 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Mon May 18 13:07:06 2015 -0700
----------------------------------------------------------------------
.../metrics/timeline/PhoenixHBaseAccessor.java | 8 +-
.../timeline/TimelineMetricConfiguration.java | 3 +
.../aggregators/AbstractTimelineAggregator.java | 6 +-
.../TimelineMetricClusterAggregatorMinute.java | 131 +++++++++++++------
.../aggregators/TimelineMetricReadHelper.java | 4 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 80 ++++++++++-
.../metrics/timeline/ITClusterAggregator.java | 130 ++++++++++++++----
.../0.1.0/configuration/ams-site.xml | 2 +-
8 files changed, 290 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index b890171..bf1ae66 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -313,8 +313,7 @@ public class PhoenixHBaseAccessor {
}
}
- public void insertMetricRecords(TimelineMetrics metrics)
- throws SQLException, IOException {
+ public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException {
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
if (timelineMetrics == null || timelineMetrics.isEmpty()) {
@@ -351,9 +350,8 @@ public class PhoenixHBaseAccessor {
metricRecordStmt.setDouble(8, aggregates[0]);
metricRecordStmt.setDouble(9, aggregates[1]);
metricRecordStmt.setDouble(10, aggregates[2]);
- metricRecordStmt.setLong(11, (long)aggregates[3]);
- String json =
- TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setLong(11, (long) aggregates[3]);
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
metricRecordStmt.setString(12, json);
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0595c20..0461261 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -157,6 +157,9 @@ public class TimelineMetricConfiguration {
public static final String CLUSTER_AGGREGATOR_APP_IDS =
"timeline.metrics.service.cluster.aggregator.appIds";
+ public static final String SERVER_SIDE_TIMESIFT_ADJUSTMENT =
+ "timeline.metrics.service.cluster.aggregator.timeshift.adjustment";
+
public static final String HOST_APP_ID = "HOST";
private Configuration hbaseConf;
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 415471d..37fb088 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -165,15 +165,15 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
lastCheckPointTime = readCheckPoint();
if (isLastCheckPointTooOld(lastCheckPointTime)) {
LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
- "lastCheckPointTime = " + lastCheckPointTime);
+ "lastCheckPointTime = " + new Date(lastCheckPointTime));
lastCheckPointTime = -1;
}
if (lastCheckPointTime == -1) {
// Assuming first run, save checkpoint and sleep.
// Set checkpoint to 2 minutes in the past to allow the
// agents/collectors to catch up
- LOG.info("Saving checkpoint time on first run." +
- (currentTime - checkpointDelayMillis));
+ LOG.info("Saving checkpoint time on first run. " +
+ new Date((currentTime - checkpointDelayMillis)));
saveCheckPoint(currentTime - checkpointDelayMillis);
}
} catch (IOException io) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
index 293608e..9b51f98 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
@@ -33,9 +33,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -49,6 +48,8 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
// Aggregator to perform app-level aggregates for host metrics
private final TimelineMetricAppAggregator appAggregator;
+ // 1 minute client side buffering adjustment
+ private final Long serverTimeShiftAdjustment;
public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
@@ -66,11 +67,14 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
appAggregator = new TimelineMetricAppAggregator(metricsConf);
this.timeSliceIntervalMillis = timeSliceInterval;
+ this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
}
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
- List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+ // Account for time shift due to client side buffering by shifting the
+ // timestamps with the difference between server time and series start time
+ List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
// Initialize app aggregates for host metrics
appAggregator.init();
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
@@ -91,17 +95,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
METRICS_RECORD_TABLE_NAME));
condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
+ /**
+ * Return time slices to normalize the timeseries data.
+ */
private List<Long[]> getTimeSlices(long startTime, long endTime) {
List<Long[]> timeSlices = new ArrayList<Long[]>();
long sliceStartTime = startTime;
while (sliceStartTime < endTime) {
- timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
sliceStartTime += timeSliceIntervalMillis;
}
return timeSlices;
@@ -111,45 +118,72 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
throws SQLException, IOException {
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
- // Create time slices
- while (rs.next()) {
- TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
- Map<TimelineClusterMetric, Double> clusterMetrics =
- sliceFromTimelineMetric(metric, timeSlices);
-
- if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
-
- TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
- Double avgValue = clusterMetricEntry.getValue();
-
- MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
- if (aggregate == null) {
- aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
- aggregateClusterMetrics.put(clusterMetric, aggregate);
- } else {
- aggregate.updateSum(avgValue);
- aggregate.updateNumberOfHosts(1);
- aggregate.updateMax(avgValue);
- aggregate.updateMin(avgValue);
- }
- // Update app level aggregates
- appAggregator.processTimelineClusterMetric(clusterMetric,
- metric.getHostName(), avgValue);
+ TimelineMetric metric = null;
+ if (rs.next()) {
+ metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+ // Call slice after all rows for a host are read
+ while (rs.next()) {
+ TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+ // If rows belong to same host combine them before slicing. This
+ // avoids issues across rows that belong to same hosts but get
+ // counted as coming from different ones.
+ if (metric.equalsExceptTime(nextMetric)) {
+ metric.addMetricValues(nextMetric.getMetricValues());
+ } else {
+ // Process the current metric
+ processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ metric = nextMetric;
}
}
}
+ // Process last metric
+ if (metric != null) {
+ processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+ }
+
// Add app level aggregates to save
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
return aggregateClusterMetrics;
}
+ /**
+ * Slice metric values into interval specified by :
+ * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+ * Normalize value by averaging them within the interval
+ */
+ private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ TimelineMetric metric, List<Long[]> timeSlices) {
+ // Create time slices
+ Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ Double avgValue = clusterMetricEntry.getValue();
+
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ // Update app level aggregates
+ appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
+ }
+ }
+ }
+
private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
- TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
if (timelineMetric.getMetricValues().isEmpty()) {
return null;
@@ -158,13 +192,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
new HashMap<TimelineClusterMetric, Double>();
+ Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
+ if (timeShift < 0) {
+ LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
+ "timeShift = " + timeShift);
+ timeShift = 0l;
+ }
+
for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
// TODO: investigate null values - pre filter
if (metric.getValue() == null) {
continue;
}
- Long timestamp = getSliceTimeForMetric(timeSlices,
- Long.parseLong(metric.getKey().toString()));
+
+ Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
if (timestamp != -1) {
// Metric is within desired time range
TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
@@ -173,12 +214,24 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
timelineMetric.getInstanceId(),
timestamp,
timelineMetric.getType());
+
+ // do a sum / count here to get average for all points in a slice
+ int count = 1;
+ Double sum;
if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
- timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+ sum = metric.getValue();
} else {
+ count++;
Double oldValue = timelineClusterMetricMap.get(clusterMetric);
- Double newValue = (oldValue + metric.getValue()) / 2;
- timelineClusterMetricMap.put(clusterMetric, newValue);
+ sum = oldValue + metric.getValue();
+ }
+ timelineClusterMetricMap.put(clusterMetric, (sum / count));
+ } else {
+ if (timelineMetric.getMetricName().equals("tserver.general.entries")) {
+ LOG.info("--- Fallen off: serverTs = " + timelineMetric.getTimestamp() +
+ ", timeShift: " + timeShift +
+ ", timestamp: " + Long.parseLong(metric.getKey().toString()) +
+ ", host = " + timelineMetric.getHostName());
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 398f4c3..573e09d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -54,7 +54,9 @@ public class TimelineMetricReadHelper {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(rs.getString("METRIC_NAME"));
metric.setAppId(rs.getString("APP_ID"));
- if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ if (!ignoreInstance) {
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ }
metric.setHostName(rs.getString("HOSTNAME"));
metric.setTimestamp(rs.getLong("SERVER_TIME"));
metric.setStartTime(rs.getLong("START_TIME"));
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 6cfaa2e..643e5cc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
@@ -30,16 +34,22 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.assertj.core.api.Assertions.assertThat;
@@ -116,8 +126,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
protected PhoenixHBaseAccessor createTestableHBaseAccessor() {
Configuration metricsConf = new Configuration();
- metricsConf.set(
- TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+ metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
return
new PhoenixHBaseAccessor(
@@ -136,4 +145,71 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
}
});
}
+
+ protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)
+ throws SQLException, IOException {
+
+ List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+ if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+ LOG.debug("Empty metrics insert request.");
+ return;
+ }
+
+ PreparedStatement metricRecordStmt = null;
+
+ try {
+ metricRecordStmt = conn.prepareStatement(String.format(
+ UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+ for (TimelineMetric metric : timelineMetrics) {
+ metricRecordStmt.clearParameters();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("host: " + metric.getHostName() + ", " +
+ "metricName = " + metric.getMetricName() + ", " +
+ "values: " + metric.getMetricValues());
+ }
+ double[] aggregates = AggregatorUtils.calculateAggregates(
+ metric.getMetricValues());
+
+ metricRecordStmt.setString(1, metric.getMetricName());
+ metricRecordStmt.setString(2, metric.getHostName());
+ metricRecordStmt.setString(3, metric.getAppId());
+ metricRecordStmt.setString(4, metric.getInstanceId());
+ metricRecordStmt.setLong(5, currentTime);
+ metricRecordStmt.setLong(6, metric.getStartTime());
+ metricRecordStmt.setString(7, metric.getType());
+ metricRecordStmt.setDouble(8, aggregates[0]);
+ metricRecordStmt.setDouble(9, aggregates[1]);
+ metricRecordStmt.setDouble(10, aggregates[2]);
+ metricRecordStmt.setLong(11, (long) aggregates[3]);
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(12, json);
+
+ try {
+ metricRecordStmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (metricRecordStmt != null) {
+ try {
+ metricRecordStmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index fb3bc30..fb3bd31 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
@@ -37,8 +40,10 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static junit.framework.Assert.assertEquals;
@@ -48,8 +53,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
@@ -113,8 +120,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
@@ -144,7 +150,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
long startTime = System.currentTimeMillis();
long ctime = startTime;
- long minute = 60 * 1000;
+ long minute = 60 * 1000 * 2;
/**
* Here we have two nodes with two instances each:
@@ -153,27 +159,33 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
* instance i2 | 3 | 4 |
*
*/
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ // Four 1's at ctime - 100
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
"i1", "disk_free", 1));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ // Four 2's at ctime - 100: different host
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
"i1", "disk_free", 2));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ // Avoid overwrite
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
"i2", "disk_free", 3));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
"i2", "disk_free", 4));
+
ctime += minute;
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ // Four 1's at ctime + 2 min
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
"i1", "disk_free", 1));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ // Four 1's at ctime + 2 min - different host
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
"i1", "disk_free", 3));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
"i2", "disk_free", 2));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
"i2", "disk_free", 4));
// WHEN
long endTime = ctime + minute;
- boolean success = agg.doWork(startTime, endTime);
+ boolean success = agg.doWork(startTime - 1000, endTime + 1000);
//THEN
Condition condition = new DefaultCondition(null, null, null, null, startTime,
@@ -182,29 +194,26 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-// PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
MetricClusterAggregate currentHostAggregate =
readHelper.getMetricClusterAggregateFromResultSet(rs);
if ("disk_free".equals(currentMetric.getMetricName())) {
- System.out.println("OUTPUT: " + currentMetric+" - " +
- ""+currentHostAggregate);
- assertEquals(4, currentHostAggregate.getNumberOfHosts());
- assertEquals(4.0, currentHostAggregate.getMax());
- assertEquals(1.0, currentHostAggregate.getMin());
- assertEquals(10.0, currentHostAggregate.getSum());
+ System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(5.0, currentHostAggregate.getSum());
recordCount++;
} else {
fail("Unexpected entry");
}
}
+
+ Assert.assertEquals(8, recordCount);
}
@Test
@@ -244,8 +253,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
@@ -476,6 +484,82 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(1.0d, currentHostAggregate.getSum());
}
+ @Test
+ public void testClusterAggregateMetricNormalization() throws Exception {
+ TimelineMetricAggregator agg =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+ TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+ // Sample data
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+ metric1.setAppId("resourcemanager");
+ metric1.setHostName("h1");
+ metric1.setStartTime(1431372311811l);
+ metric1.setMetricValues(new HashMap<Long, Double>() {{
+ put(1431372311811l, 1.0);
+ put(1431372321811l, 1.0);
+ put(1431372331811l, 1.0);
+ put(1431372341811l, 1.0);
+ put(1431372351811l, 1.0);
+ put(1431372361811l, 1.0);
+ put(1431372371810l, 1.0);
+ }});
+
+ TimelineMetric metric2 = new TimelineMetric();
+ metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+ metric2.setAppId("resourcemanager");
+ metric2.setHostName("h1");
+ metric2.setStartTime(1431372381810l);
+ metric2.setMetricValues(new HashMap<Long, Double>() {{
+ put(1431372381810l, 1.0);
+ put(1431372391811l, 1.0);
+ put(1431372401811l, 1.0);
+ put(1431372411811l, 1.0);
+ put(1431372421811l, 1.0);
+ put(1431372431811l, 1.0);
+ put(1431372441810l, 1.0);
+ }});
+
+ TimelineMetrics metrics = new TimelineMetrics();
+ metrics.setMetrics(Collections.singletonList(metric1));
+ insertMetricRecords(conn, metrics, 1431372371810l);
+
+ metrics.setMetrics(Collections.singletonList(metric2));
+ insertMetricRecords(conn, metrics, 1431372441810l);
+
+ long startTime = 1431372055000l;
+ long endTime = 1431372655000l;
+
+ agg.doWork(startTime, endTime);
+
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
+ assertEquals(1, currentHostAggregate.getNumberOfHosts());
+ assertEquals(1.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(1.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ Assert.assertEquals(9, recordCount);
+ }
+
private ResultSet executeQuery(String query) throws SQLException {
Connection conn = getConnection(getUrl());
Statement stmt = conn.createStatement();
http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index fc52e5a..c716bea 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -198,7 +198,7 @@
</property>
<property>
<name>timeline.metrics.cluster.aggregator.minute.timeslice.interval</name>
- <value>15</value>
+ <value>30</value>
<description>
Lowest resolution of desired data for cluster level minute aggregates.
</description>