You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/17 22:03:07 UTC
ambari git commit: AMBARI-8769. Aggregator checkpoint logic should
take into account the checkPointDelay.
Repository: ambari
Updated Branches:
refs/heads/trunk 0dcbcc1ba -> cd3fb17f4
AMBARI-8769. Aggregator checkpoint logic should take into account the checkPointDelay.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cd3fb17f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cd3fb17f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cd3fb17f
Branch: refs/heads/trunk
Commit: cd3fb17f4d770fb354c76f0eb1df738aa66848d5
Parents: 0dcbcc1
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Dec 17 12:53:50 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Dec 17 12:53:50 2014 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineAggregator.java | 135 ++++++-----
.../AbstractTimelineAggregatorTest.java | 222 +++++++++++++++++++
2 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3fb17f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index a123e57..f169003 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,6 +21,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import java.io.File;
import java.io.IOException;
@@ -37,18 +39,26 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public abstract class AbstractTimelineAggregator implements Runnable {
protected final PhoenixHBaseAccessor hBaseAccessor;
private final Log LOG;
+
+ private Clock clock;
protected final long checkpointDelayMillis;
protected final Integer resultsetFetchSize;
protected Configuration metricsConf;
public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf) {
+ this(hBaseAccessor, metricsConf, new SystemClock());
+ }
+
+ public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf, Clock clk) {
this.hBaseAccessor = hBaseAccessor;
this.metricsConf = metricsConf;
this.checkpointDelayMillis = SECONDS.toMillis(
metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
this.LOG = LogFactory.getLog(this.getClass());
+ this.clock = clk;
}
@Override
@@ -57,75 +67,93 @@ public abstract class AbstractTimelineAggregator implements Runnable {
Long SLEEP_INTERVAL = getSleepIntervalMillis();
while (true) {
- long currentTime = System.currentTimeMillis();
- long lastCheckPointTime = -1;
+ long sleepTime = runOnce(SLEEP_INTERVAL);
try {
- lastCheckPointTime = readCheckPoint();
- if (isLastCheckPointTooOld(lastCheckPointTime)) {
- LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
- "lastCheckPointTime = " + lastCheckPointTime);
- lastCheckPointTime = -1;
- }
- if (lastCheckPointTime == -1) {
- // Assuming first run, save checkpoint and sleep.
- // Set checkpoint to 2 minutes in the past to allow the
- // agents/collectors to catch up
- saveCheckPoint(currentTime - checkpointDelayMillis);
- }
- } catch (IOException io) {
- LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted, continuing with aggregation.");
}
- long sleepTime = SLEEP_INTERVAL;
-
- if (lastCheckPointTime != -1) {
- LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((System.currentTimeMillis() - lastCheckPointTime) / 1000)
- + " seconds.");
-
- long startTime = System.currentTimeMillis();
- boolean success = doWork(lastCheckPointTime,
- lastCheckPointTime + SLEEP_INTERVAL);
- long executionTime = System.currentTimeMillis() - startTime;
- long delta = SLEEP_INTERVAL - executionTime;
-
- if (delta > 0) {
- // Sleep for (configured sleep - time to execute task)
- sleepTime = delta;
- } else {
- // No sleep because last run took too long to execute
- LOG.info("Aggregator execution took too long, " +
- "cancelling sleep. executionTime = " + executionTime);
- sleepTime = 1;
- }
+ }
+ }
- LOG.debug("Aggregator sleep interval = " + sleepTime);
+ /**
+ * Access relaxed for tests
+ */
+ protected long runOnce(Long SLEEP_INTERVAL) {
+ long currentTime = clock.getTime();
+ long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
+ long sleepTime = SLEEP_INTERVAL;
+
+ if (lastCheckPointTime != -1) {
+ LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+ + ((clock.getTime() - lastCheckPointTime) / 1000)
+ + " seconds.");
+
+ long startTime = clock.getTime();
+ boolean success = doWork(lastCheckPointTime,
+ lastCheckPointTime + SLEEP_INTERVAL);
+ long executionTime = clock.getTime() - startTime;
+ long delta = SLEEP_INTERVAL - executionTime;
+
+ if (delta > 0) {
+ // Sleep for (configured sleep - time to execute task)
+ sleepTime = delta;
+ } else {
+ // No sleep because last run took too long to execute
+ LOG.info("Aggregator execution took too long, " +
+ "cancelling sleep. executionTime = " + executionTime);
+ sleepTime = 1;
+ }
- if (success) {
- try {
- saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
- } catch (IOException io) {
- LOG.warn("Error saving checkpoint, restarting aggregation at " +
- "previous checkpoint.");
- }
+ LOG.debug("Aggregator sleep interval = " + sleepTime);
+
+ if (success) {
+ try {
+ saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+ } catch (IOException io) {
+ LOG.warn("Error saving checkpoint, restarting aggregation at " +
+ "previous checkpoint.");
}
}
+ }
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted, continuing with aggregation.");
+ return sleepTime;
+ }
+
+ private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+ long lastCheckPointTime = -1;
+
+ try {
+ lastCheckPointTime = readCheckPoint();
+ if (isLastCheckPointTooOld(lastCheckPointTime)) {
+ LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+ "lastCheckPointTime = " + lastCheckPointTime);
+ lastCheckPointTime = -1;
}
+ if (lastCheckPointTime == -1) {
+ // Assuming first run, save checkpoint and sleep.
+ // Set checkpoint to 2 minutes in the past to allow the
+ // agents/collectors to catch up
+ LOG.info("Saving checkpoint time on first run." +
+ (currentTime - checkpointDelayMillis));
+ saveCheckPoint(currentTime - checkpointDelayMillis);
+ }
+ } catch (IOException io) {
+ LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
}
+ return lastCheckPointTime;
}
private boolean isLastCheckPointTooOld(long checkpoint) {
+ // first checkpoint is saved checkpointDelayMillis in the past,
+ // so here we also need to take it into account
return checkpoint != -1 &&
- ((System.currentTimeMillis() - checkpoint) >
+ ((clock.getTime() - checkpoint - checkpointDelayMillis) >
getCheckpointCutOffIntervalMillis());
}
- private long readCheckPoint() {
+ protected long readCheckPoint() {
try {
File checkpoint = new File(getCheckpointLocation());
if (checkpoint.exists()) {
@@ -140,7 +168,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
return -1;
}
- private void saveCheckPoint(long checkpointTime) throws IOException {
+ protected void saveCheckPoint(long checkpointTime) throws IOException {
File checkpoint = new File(getCheckpointLocation());
if (!checkpoint.exists()) {
boolean done = checkpoint.createNewFile();
@@ -225,5 +253,4 @@ public abstract class AbstractTimelineAggregator implements Runnable {
protected abstract boolean isDisabled();
protected abstract String getCheckpointLocation();
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3fb17f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
new file mode 100644
index 0000000..c274c61
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
@@ -0,0 +1,222 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+
+public class AbstractTimelineAggregatorTest {
+
+ private AbstractTimelineAggregator agg;
+ TestClock clock = new TestClock();
+
+ AtomicLong startTimeInDoWork;
+ AtomicLong endTimeInDoWork;
+ AtomicLong checkPoint;
+ int actualRuns;
+
+ long sleepIntervalMillis;
+ int checkpointCutOffMultiplier;
+
+ @Before
+ public void setUp() throws Exception {
+ sleepIntervalMillis = 30000l;
+ checkpointCutOffMultiplier = 2;
+
+ Configuration metricsConf = new Configuration();
+ metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0);
+ metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000);
+
+ startTimeInDoWork = new AtomicLong(0);
+ endTimeInDoWork = new AtomicLong(0);
+ checkPoint = new AtomicLong(-1);
+ actualRuns = 0;
+
+ agg = new AbstractTimelineAggregator(
+ null, metricsConf, clock) {
+ @Override
+ protected boolean doWork(long startTime, long endTime) {
+ startTimeInDoWork.set(startTime);
+ endTimeInDoWork.set(endTime);
+ actualRuns++;
+
+ return true;
+ }
+
+ @Override
+ protected PhoenixTransactSQL.Condition
+ prepareMetricQueryCondition(long startTime, long endTime) {
+ return null;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime,
+ long endTime) throws IOException, SQLException {
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return false;
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return "dummy_ckptFile";
+ }
+
+ protected long readCheckPoint() {
+ return checkPoint.get();
+ }
+
+ @Override
+ protected void saveCheckPoint(long checkpointTime) throws IOException {
+ checkPoint.set(checkpointTime);
+ }
+ };
+
+
+ }
+
+ @Test
+ public void testDoWorkOnZeroDelay() throws Exception {
+
+ // starting at time 0;
+ clock.setTime(0);
+
+ long sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+ Assert.assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
+ Assert.assertEquals(0, checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+ Assert.assertEquals("Do not aggregate on first run", 0, actualRuns);
+
+ // exactly one sleepInterval
+ clock.setTime(clock.getTime() + sleepIntervalMillis);
+ sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals("startTime", clock.getTime() -
+ sleepIntervalMillis,
+ startTimeInDoWork.get());
+ Assert.assertEquals("endTime", clock.getTime(),
+ endTimeInDoWork.get());
+ Assert.assertEquals(clock.getTime(), checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+ Assert.assertEquals(1, actualRuns);
+
+ // exactly one sleepInterval
+ clock.setTime(clock.getTime() + sleepIntervalMillis);
+ sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals("startTime", clock.getTime() -
+ sleepIntervalMillis,
+ startTimeInDoWork.get());
+ Assert.assertEquals("endTime", clock.getTime(),
+ endTimeInDoWork.get());
+ Assert.assertEquals(clock.getTime(), checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+ Assert.assertEquals(2, actualRuns);
+
+ // checkpointCutOffMultiplier x sleepInterval - should pass,
+ // it will aggregate only first part of the whole 2x interval
+ // and sleep as usual (don't we need to skip some sleep?)
+ //
+ // effectively checkpoint will be one interval in the past,
+ // so next run will
+ clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+ sleepIntervalMillis));
+ sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals("startTime after 2xinterval", clock.getTime() -
+ (checkpointCutOffMultiplier * sleepIntervalMillis),
+ startTimeInDoWork.get());
+ Assert.assertEquals("endTime after 2xinterval", clock.getTime() -
+ sleepIntervalMillis,
+ endTimeInDoWork.get());
+ Assert.assertEquals("checkpoint after 2xinterval", clock.getTime() -
+ sleepIntervalMillis, checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+ Assert.assertEquals(3, actualRuns);
+
+ // exactly one sleepInterval after one that lagged by one whole interval,
+ // so it will do the previous one... and sleep as usual
+ // no way to keep up
+ clock.setTime(clock.getTime() + sleepIntervalMillis);
+ sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals("startTime ", clock.getTime() -
+ (checkpointCutOffMultiplier * sleepIntervalMillis),
+ startTimeInDoWork.get());
+ Assert.assertEquals("endTime ", clock.getTime() -
+ sleepIntervalMillis,
+ endTimeInDoWork.get());
+ Assert.assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
+ checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+ Assert.assertEquals(4, actualRuns);
+
+
+ // checkpointCutOffMultiplier x sleepInterval - in normal state should pass,
+ // but the clock lags too much, so this will not execute aggregation
+ // just update checkpoint to currentTime
+ clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+ sleepIntervalMillis));
+ sleep = agg.runOnce(sleepIntervalMillis);
+ Assert.assertEquals(4, actualRuns);
+ Assert.assertEquals("checkpoint after too much lag is reset to " +
+ "current clock time",
+ clock.getTime(), checkPoint.get());
+ Assert.assertEquals(sleep, sleepIntervalMillis);
+
+
+ }
+
+ //testDoWorkOnInterruptedruns
+// 1. On interrupted it can skip some metrics
+// testOnInterruption:
+// // if sleep is interrupted.. is it ok?
+// clock.setTime(10000);
+// sleep = agg.runOnce(sleepIntervalMillis);
+// Assert.assertEquals("startTime should be zero", 0,
+// startTimeInDoWork.get());
+// Assert.assertEquals("endTime should be zero", 0, endTimeInDoWork.get()
+// + sleepIntervalMillis);
+//
+// //if it is interrupted again:
+// clock.setTime(30000);
+// sleep = agg.runOnce(sleepIntervalMillis);
+ //
+ // 2. if it lags it can skip??
+ //
+
+ private static class TestClock implements Clock {
+
+ private long time;
+
+ public void setTime(long time) {
+ this.time = time;
+ }
+
+ @Override
+ public long getTime() {
+ return time;
+ }
+ }
+}
\ No newline at end of file