You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/03/30 20:00:35 UTC
ambari git commit: AMBARI-15621 : Cluster Second aggregator taking
more than 2 mins to execute on large clusters, thereby causing lag (avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.2 77e1988bb -> d0b96cd95
AMBARI-15621 : Cluster Second aggregator taking more than 2 mins to execute on large clusters, thereby causing lag (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d0b96cd9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d0b96cd9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d0b96cd9
Branch: refs/heads/branch-2.2
Commit: d0b96cd95f5a5c9e3d672e820b2551a85c8ca41e
Parents: 77e1988
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Mar 30 10:53:44 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Mar 30 10:53:44 2016 -0700
----------------------------------------------------------------------
.../timeline/HBaseTimelineMetricStore.java | 19 +++--
.../aggregators/AbstractTimelineAggregator.java | 82 +++++++++-----------
.../AbstractTimelineAggregatorTest.java | 48 ++++++------
.../stacks/HDP/2.0.6/services/stack_advisor.py | 2 +
4 files changed, 71 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 2f080e3..a32e206 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -95,37 +95,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
- scheduleAggregatorThread(secondClusterAggregator, metricsConf);
+ scheduleAggregatorThread(secondClusterAggregator);
// Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(minuteClusterAggregator, metricsConf);
+ scheduleAggregatorThread(minuteClusterAggregator);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(hourlyClusterAggregator, metricsConf);
+ scheduleAggregatorThread(hourlyClusterAggregator);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(dailyClusterAggregator, metricsConf);
+ scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
TimelineMetricAggregator minuteHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(minuteHostAggregator, metricsConf);
+ scheduleAggregatorThread(minuteHostAggregator);
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(hourlyHostAggregator, metricsConf);
+ scheduleAggregatorThread(hourlyHostAggregator);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
- scheduleAggregatorThread(dailyHostAggregator, metricsConf);
+ scheduleAggregatorThread(dailyHostAggregator);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
@@ -333,12 +333,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
return metricMetadataManager.getHostedAppsCache();
}
- private void scheduleAggregatorThread(TimelineMetricAggregator aggregator,
- Configuration metricsConf) {
+ private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
if (!aggregator.isDisabled()) {
executorService.scheduleAtFixedRate(aggregator,
- SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)),
+ 0l,
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index f8ec516..c576a40 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -52,7 +52,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected String tableName;
protected String outputTableName;
protected Long nativeTimeRangeDelay;
- protected Long lastAggregatedEndTime = -1l;
// Explicitly name aggregators for logging needs
private final String aggregatorName;
@@ -93,18 +92,19 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
LOG.info("Started Timeline aggregator thread @ " + new Date());
Long SLEEP_INTERVAL = getSleepIntervalMillis();
runOnce(SLEEP_INTERVAL);
- this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL;
}
/**
* Access relaxed for tests
*/
public void runOnce(Long SLEEP_INTERVAL) {
- long lastCheckPointTime = readLastCheckpointSavingOnFirstRun();
+
+ long currentTime = System.currentTimeMillis();
+ long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
if (lastCheckPointTime != -1) {
LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((lastAggregatedEndTime - lastCheckPointTime) / 1000)
+ + ((currentTime - lastCheckPointTime) / 1000)
+ " seconds.");
boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
@@ -120,40 +120,40 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
}
}
- private long readLastCheckpointSavingOnFirstRun() {
+ private long readLastCheckpointSavingOnFirstRun(long currentTime) {
long lastCheckPointTime = -1;
try {
lastCheckPointTime = readCheckPoint();
- LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
-
- if (lastAggregatedEndTime == -1l) {
- lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
- }
-
- if (isLastCheckPointTooOld(lastCheckPointTime)) {
- LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
- "lastCheckPointTime = " + new Date(lastCheckPointTime));
- lastCheckPointTime = -1;
- }
-
- if (lastCheckPointTime > 0) {
- lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
- LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
- }
-
- if (isLastCheckPointTooYoung(lastCheckPointTime)) {
- LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
- lastCheckPointTime = -1;
- }
-
- if (lastCheckPointTime == -1) {
- // Assuming first run, save checkpoint and sleep.
- // Set checkpoint to rounded time in the past to allow the
- // agents/collectors to catch up
- LOG.info("Saving checkpoint time on first run. " +
- new Date((lastAggregatedEndTime)));
- saveCheckPoint(lastAggregatedEndTime);
+ if (lastCheckPointTime != -1) {
+ LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
+ if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) {
+ LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+ "lastCheckPointTime = " + new Date(lastCheckPointTime));
+ lastCheckPointTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - getSleepIntervalMillis();
+ LOG.info("Saving checkpoint time. " + new Date((lastCheckPointTime)));
+ saveCheckPoint(lastCheckPointTime);
+
+ } else {
+
+ if (lastCheckPointTime > 0) {
+ lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
+ LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
+ }
+
+ if (isLastCheckPointTooYoung(lastCheckPointTime)) {
+ LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
+ return -1; //Skip Aggregation this time around
+ }
+ }
+ } else {
+ /*
+ No checkpoint. Save current rounded checkpoint and sleep for 1 cycle.
+ */
+ LOG.info("No checkpoint found");
+ long firstCheckPoint = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
+ LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint)));
+ saveCheckPoint(firstCheckPoint);
}
} catch (IOException io) {
LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
@@ -161,16 +161,16 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
return lastCheckPointTime;
}
- private boolean isLastCheckPointTooOld(long checkpoint) {
+ private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) {
// first checkpoint is saved checkpointDelayMillis in the past,
// so here we also need to take it into account
return checkpoint != -1 &&
- ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis());
+ ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis());
}
private boolean isLastCheckPointTooYoung(long checkpoint) {
return checkpoint != -1 &&
- ((lastAggregatedEndTime <= checkpoint));
+ ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= checkpoint));
}
protected long readCheckPoint() {
@@ -295,14 +295,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
return checkpointLocation;
}
- protected void setLastAggregatedEndTime(long lastAggregatedEndTime) {
- this.lastAggregatedEndTime = lastAggregatedEndTime;
- }
-
- protected long getLastAggregatedEndTime() {
- return lastAggregatedEndTime;
- }
-
public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
return referenceTime - (referenceTime % aggregatorPeriod);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 21b9839..827f399 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.junit.Before;
import org.junit.Test;
@@ -114,45 +115,42 @@ public class AbstractTimelineAggregatorTest {
long currentTime = System.currentTimeMillis();
long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
sleepIntervalMillis);
-
+
//Test first run of aggregator with no checkpoint
checkPoint.set(-1);
- agg.setLastAggregatedEndTime(-1l);
agg.runOnce(sleepIntervalMillis);
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
assertEquals(roundedOffAggregatorTime, checkPoint.get());
assertEquals("Do not aggregate on first run", 0, actualRuns);
- //Test first run with Too Old checkpoint
+// //Test first run with too "recent" checkpoint
currentTime = System.currentTimeMillis();
- checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
- agg.setLastAggregatedEndTime(-1l);
+ checkPoint.set(currentTime);
+ agg.setSleepIntervalMillis(sleepIntervalMillis);
agg.runOnce(sleepIntervalMillis);
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
- assertEquals(roundedOffAggregatorTime, checkPoint.get());
assertEquals("Do not aggregate on first run", 0, actualRuns);
- //Test first run with too "recent" checkpoint
+ //Test first run with Too Old checkpoint
currentTime = System.currentTimeMillis();
- checkPoint.set(currentTime);
- agg.setLastAggregatedEndTime(-1l);
- agg.setSleepIntervalMillis(sleepIntervalMillis);
+ checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
- assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
- assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get());
- assertEquals("Do not aggregate on first run", 0, actualRuns);
+ long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis);
+ assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
+ assertEquals("endTime should be zero", checkPointTime, endTimeInDoWork.get());
+ assertEquals(roundedOffAggregatorTime, checkPoint.get());
+ assertEquals("Do not aggregate on first run", 1, actualRuns);
+
- //Test first run with perfect checkpoint (sleepIntervalMillis back)
+// //Test first run with perfect checkpoint (sleepIntervalMillis back)
currentTime = System.currentTimeMillis();
roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
sleepIntervalMillis);
- long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
+ checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
checkPoint.set(checkPointTime);
- agg.setLastAggregatedEndTime(-1l);
agg.runOnce(sleepIntervalMillis);
assertEquals("startTime should the lower rounded time of the checkpoint time",
expectedCheckPoint, startTimeInDoWork.get());
@@ -160,20 +158,20 @@ public class AbstractTimelineAggregatorTest {
expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
assertEquals(expectedCheckPoint + sleepIntervalMillis,
checkPoint.get());
- assertEquals("Aggregate on first run", 1, actualRuns);
+ assertEquals("Aggregate on first run", 2, actualRuns);
//Test edge case for checkpoint (2 x sleepIntervalMillis)
- checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis;
- expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
- checkPoint.set(checkPointTime);
+ currentTime = System.currentTimeMillis();
+ checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
agg.runOnce(sleepIntervalMillis);
+ long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
assertEquals("startTime should the lower rounded time of the checkpoint time",
- expectedCheckPoint, startTimeInDoWork.get());
+ expectedStartTime, startTimeInDoWork.get());
assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
- expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
- assertEquals(expectedCheckPoint + sleepIntervalMillis,
+ expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get());
+ assertEquals(expectedStartTime + sleepIntervalMillis,
checkPoint.get());
- assertEquals("Aggregate on second run", 2, actualRuns);
+ assertEquals("Aggregate on second run", 3, actualRuns);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index b8e2c45..a007817 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -592,12 +592,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20)
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000)
putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30)
+ putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000)
elif total_sinks_count >= 500:
putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000)
+ putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000)
else:
putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000)
pass