You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/03/04 19:58:10 UTC

ambari git commit: AMBARI-15267 : Metrics aggregate times should be tied to aggregation period instead of AMS start time (avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 07da1862a -> bda06cfbf


AMBARI-15267 : Metrics aggregate times should be tied to aggregation period instead of AMS start time (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bda06cfb
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bda06cfb
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bda06cfb

Branch: refs/heads/branch-2.2
Commit: bda06cfbf8a742b9dbe9a0b1fc0c78dd7f2e4e63
Parents: 07da186
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Fri Mar 4 10:57:43 2016 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Fri Mar 4 10:57:54 2016 -0800

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      |  50 ++---
 .../aggregators/AbstractTimelineAggregator.java | 117 +++++------
 .../aggregators/TimelineClusterMetric.java      |   4 +
 .../aggregators/TimelineMetricAggregator.java   |   9 +-
 .../TimelineMetricClusterAggregator.java        |   8 +-
 .../TimelineMetricClusterAggregatorSecond.java  |   6 +-
 .../TimelineMetricHostAggregator.java           |   6 +-
 .../v2/TimelineMetricClusterAggregator.java     |   2 +-
 .../v2/TimelineMetricHostAggregator.java        |   2 +-
 .../timeline/query/PhoenixTransactSQL.java      |   4 +-
 .../AbstractTimelineAggregatorTest.java         | 204 +++++--------------
 11 files changed, 157 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 37e4796..f460292 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -48,6 +48,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 
 public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
@@ -92,58 +94,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       // Start the cluster aggregator second
       TimelineMetricAggregator secondClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
-      if (!secondClusterAggregator.isDisabled()) {
-        Thread aggregatorThread = new Thread(secondClusterAggregator);
-        aggregatorThread.start();
-      }
+      scheduleAggregatorThread(secondClusterAggregator, metricsConf);
 
-      // Start the minute cluster aggregator
+//      // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
-      if (!minuteClusterAggregator.isDisabled()) {
-        Thread aggregatorThread = new Thread(minuteClusterAggregator);
-        aggregatorThread.start();
-      }
+      scheduleAggregatorThread(minuteClusterAggregator, metricsConf);
 
       // Start the hourly cluster aggregator
       TimelineMetricAggregator hourlyClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
-      if (!hourlyClusterAggregator.isDisabled()) {
-        Thread aggregatorThread = new Thread(hourlyClusterAggregator);
-        aggregatorThread.start();
-      }
+      scheduleAggregatorThread(hourlyClusterAggregator, metricsConf);
 
       // Start the daily cluster aggregator
       TimelineMetricAggregator dailyClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
-      if (!dailyClusterAggregator.isDisabled()) {
-        Thread aggregatorThread = new Thread(dailyClusterAggregator);
-        aggregatorThread.start();
-      }
+      scheduleAggregatorThread(dailyClusterAggregator, metricsConf);
 
       // Start the minute host aggregator
       TimelineMetricAggregator minuteHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
-      if (!minuteHostAggregator.isDisabled()) {
-        Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
-        minuteAggregatorThread.start();
-      }
+      scheduleAggregatorThread(minuteHostAggregator, metricsConf);
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
-      if (!hourlyHostAggregator.isDisabled()) {
-        Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
-        aggregatorHourlyThread.start();
-      }
+      scheduleAggregatorThread(hourlyHostAggregator, metricsConf);
 
       // Start the daily host aggregator
       TimelineMetricAggregator dailyHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
-      if (!dailyHostAggregator.isDisabled()) {
-        Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
-        aggregatorDailyThread.start();
-      }
+      scheduleAggregatorThread(dailyHostAggregator, metricsConf);
 
       if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
         int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
@@ -350,4 +331,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
     return metricMetadataManager.getHostedAppsCache();
   }
+
+  private void scheduleAggregatorThread(TimelineMetricAggregator aggregator,
+                                        Configuration metricsConf) {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    if (!aggregator.isDisabled()) {
+      executorService.scheduleAtFixedRate(aggregator,
+        SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)),
+        aggregator.getSleepIntervalMillis(),
+        TimeUnit.MILLISECONDS);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index fce5a39..f8ec516 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 import java.io.File;
@@ -44,7 +42,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator {
   protected final PhoenixHBaseAccessor hBaseAccessor;
   protected final Logger LOG;
-  private Clock clock;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
@@ -55,25 +52,20 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   protected String tableName;
   protected String outputTableName;
   protected Long nativeTimeRangeDelay;
+  protected Long lastAggregatedEndTime = -1l;
+
   // Explicitly name aggregators for logging needs
   private final String aggregatorName;
 
   AbstractTimelineAggregator(String aggregatorName,
                              PhoenixHBaseAccessor hBaseAccessor,
-                             Configuration metricsConf, Clock clk) {
+                             Configuration metricsConf) {
     this.aggregatorName = aggregatorName;
     this.hBaseAccessor = hBaseAccessor;
     this.metricsConf = metricsConf;
     this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
     this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
     this.LOG = LoggerFactory.getLogger(aggregatorName);
-    this.clock = clk;
-  }
-
-  AbstractTimelineAggregator(String aggregatorName,
-                             PhoenixHBaseAccessor hBaseAccessor,
-                             Configuration metricsConf) {
-    this(aggregatorName, hBaseAccessor, metricsConf, new SystemClock());
   }
 
   public AbstractTimelineAggregator(String aggregatorName,
@@ -100,84 +92,68 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   public void run() {
     LOG.info("Started Timeline aggregator thread @ " + new Date());
     Long SLEEP_INTERVAL = getSleepIntervalMillis();
-
-    while (true) {
-      long sleepTime = runOnce(SLEEP_INTERVAL);
-
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted, continuing with aggregation.");
-      }
-    }
+    runOnce(SLEEP_INTERVAL);
+    this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL;
   }
 
   /**
    * Access relaxed for tests
    */
-  public long runOnce(Long SLEEP_INTERVAL) {
-    long currentTime = clock.getTime();
-    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
-    long sleepTime = SLEEP_INTERVAL;
+  public void runOnce(Long SLEEP_INTERVAL) {
+    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun();
 
     if (lastCheckPointTime != -1) {
       LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-        + ((clock.getTime() - lastCheckPointTime) / 1000)
+        + ((lastAggregatedEndTime - lastCheckPointTime) / 1000)
         + " seconds.");
 
-      long startTime = clock.getTime();
       boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
-      long executionTime = clock.getTime() - startTime;
-      long delta = SLEEP_INTERVAL - executionTime;
-
-      if (delta > 0) {
-        // Sleep for (configured sleep - time to execute task)
-        sleepTime = delta;
-      } else {
-        // No sleep because last run took too long to execute
-        LOG.info("Aggregator execution took too long, " +
-          "cancelling sleep. executionTime = " + executionTime);
-        sleepTime = 1;
-      }
-
-      LOG.debug("Aggregator sleep interval = " + sleepTime);
 
       if (success) {
         try {
-          // Comment to bug fix:
-          // cannot just save lastCheckPointTime + SLEEP_INTERVAL,
-          // it has to be verified so it is not a time in the future
-          // checkpoint says what was aggregated, and there is no way
-          // the future metrics were aggregated!
-          saveCheckPoint(Math.min(currentTime, lastCheckPointTime +
-            SLEEP_INTERVAL));
+          saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
         } catch (IOException io) {
           LOG.warn("Error saving checkpoint, restarting aggregation at " +
             "previous checkpoint.");
         }
       }
     }
-
-    return sleepTime;
   }
 
-  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+  private long readLastCheckpointSavingOnFirstRun() {
     long lastCheckPointTime = -1;
 
     try {
       lastCheckPointTime = readCheckPoint();
+      LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
+
+      if (lastAggregatedEndTime == -1l) {
+        lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
+      }
+
       if (isLastCheckPointTooOld(lastCheckPointTime)) {
         LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
           "lastCheckPointTime = " + new Date(lastCheckPointTime));
         lastCheckPointTime = -1;
       }
+
+      if (lastCheckPointTime > 0) {
+        lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
+        LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
+      }
+
+      if (isLastCheckPointTooYoung(lastCheckPointTime)) {
+        LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
+        lastCheckPointTime = -1;
+      }
+
       if (lastCheckPointTime == -1) {
         // Assuming first run, save checkpoint and sleep.
-        // Set checkpoint to 2 minutes in the past to allow the
+        // Set checkpoint to rounded time in the past to allow the
         // agents/collectors to catch up
         LOG.info("Saving checkpoint time on first run. " +
-          new Date((currentTime - checkpointDelayMillis)));
-        saveCheckPoint(currentTime - checkpointDelayMillis);
+          new Date((lastAggregatedEndTime)));
+        saveCheckPoint(lastAggregatedEndTime);
       }
     } catch (IOException io) {
       LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
@@ -189,8 +165,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     // first checkpoint is saved checkpointDelayMillis in the past,
     // so here we also need to take it into account
     return checkpoint != -1 &&
-      ((clock.getTime() - checkpoint - checkpointDelayMillis) >
-        getCheckpointCutOffIntervalMillis());
+      ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis());
+  }
+
+  private boolean isLastCheckPointTooYoung(long checkpoint) {
+    return checkpoint != -1 &&
+      ((lastAggregatedEndTime <= checkpoint));
   }
 
   protected long readCheckPoint() {
@@ -227,7 +207,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
    * @param startTime Sample start time
    * @param endTime Sample end time
    */
-  @Override
   public boolean doWork(long startTime, long endTime) {
     LOG.info("Start aggregation cycle @ " + new Date() + ", " +
       "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
@@ -292,10 +271,14 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
 
   protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException;
 
-  protected Long getSleepIntervalMillis() {
+  public Long getSleepIntervalMillis() {
     return sleepIntervalMillis;
   }
 
+  public void setSleepIntervalMillis(Long sleepIntervalMillis) {
+    this.sleepIntervalMillis = sleepIntervalMillis;
+  }
+
   protected Integer getCheckpointCutOffMultiplier() {
     return checkpointCutOffMultiplier;
   }
@@ -311,4 +294,22 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   protected String getCheckpointLocation() {
     return checkpointLocation;
   }
+
+  protected void setLastAggregatedEndTime(long lastAggregatedEndTime) {
+    this.lastAggregatedEndTime = lastAggregatedEndTime;
+  }
+
+  protected long getLastAggregatedEndTime() {
+    return lastAggregatedEndTime;
+  }
+
+  public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
+    return referenceTime - (referenceTime % aggregatorPeriod);
+  }
+
+  public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
+    long currentTime = System.currentTimeMillis();
+    return currentTime - (currentTime % aggregatorPeriod);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
index 3c30a6f..b7d9110 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
@@ -94,4 +94,8 @@ public class TimelineClusterMetric {
       ", timestamp=" + timestamp +
       '}';
   }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index 96be48d..295db0e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -31,4 +31,11 @@ public interface TimelineMetricAggregator extends Runnable {
    * @return true/false
    */
   public boolean isDisabled();
-}
+
+  /**
+   * Return aggregator Interval
+   * @return Interval in Millis
+   */
+  public Long getSleepIntervalMillis();
+
+  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 1c1c4b6..b7609c0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -81,13 +79,13 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName);
   }
 
-  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
     throws IOException, SQLException {
 
     TimelineClusterMetric existingMetric = null;
@@ -106,6 +104,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
       if (existingMetric == null) {
         // First row
         existingMetric = currentMetric;
+        currentMetric.setTimestamp(endTime);
         hostAggregate = new MetricHostAggregate();
         hostAggregateMap.put(currentMetric, hostAggregate);
       }
@@ -117,6 +116,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
       } else {
         // Switched over to a new metric - save existing
         hostAggregate = new MetricHostAggregate();
+        currentMetric.setTimestamp(endTime);
         updateAggregatesFromHost(hostAggregate, currentHostAggregate);
         hostAggregateMap.put(currentMetric, hostAggregate);
         existingMetric = currentMetric;

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 1f9b2ec..e8e16a7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -237,12 +237,12 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   }
 
   /**
-   * Return beginning of the time slice into which the metric fits.
+   * Return end of the time slice into which the metric fits.
    */
   private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
     for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[0];
+      if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
+        return timeSlice[1];
       }
     }
     return -1l;

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index e0fa26e..a976faf 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -54,7 +54,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
 
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
@@ -78,7 +78,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
     return condition;
   }
 
-  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
       throws IOException, SQLException {
     TimelineMetric existingMetric = null;
     MetricHostAggregate hostAggregate = null;
@@ -94,6 +94,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
       if (existingMetric == null) {
         // First row
         existingMetric = currentMetric;
+        currentMetric.setTimestamp(endTime);
         hostAggregate = new MetricHostAggregate();
         hostAggregateMap.put(currentMetric, hostAggregate);
       }
@@ -103,6 +104,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
         hostAggregate.updateAggregates(currentHostAggregate);
       } else {
         // Switched over to a new metric - save existing - create new aggregate
+        currentMetric.setTimestamp(endTime);
         hostAggregate = new MetricHostAggregate();
         hostAggregate.updateAggregates(currentHostAggregate);
         hostAggregateMap.put(currentMetric, hostAggregate);

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index 5257412..6991b74 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -72,7 +72,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
     condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
-      outputTableName, aggregateColumnName, tableName,
+      outputTableName, endTime, aggregateColumnName, tableName,
       startTime, endTime));
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 1c46642..7ce03c1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -65,7 +65,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
 
     condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
-      outputTableName, tableName, startTime, endTime));
+      outputTableName, endTime, tableName, startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Condition: " + condition.toString());

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 2fc6c34..e67a5b8 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -259,7 +259,7 @@ public class PhoenixTransactSQL {
   public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " +
     "INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
     "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
-    "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, " +
+    "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, %s AS SERVER_TIME, UNITS, " +
     "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
     "FROM %s WHERE SERVER_TIME >= %s AND SERVER_TIME < %s " +
     "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";
@@ -271,7 +271,7 @@ public class PhoenixTransactSQL {
   public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " +
     "INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
     "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " +
-    "INSTANCE_ID, MAX(SERVER_TIME), UNITS, SUM(METRIC_SUM), SUM(%s), " +
+    "INSTANCE_ID, %s AS SERVER_TIME, UNITS, SUM(METRIC_SUM), SUM(%s), " +
     "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE SERVER_TIME >= %s AND " +
     "SERVER_TIME < %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 2b29469..8f7320b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,10 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Before;
 import org.junit.Test;
 import java.io.IOException;
@@ -35,7 +32,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public class AbstractTimelineAggregatorTest {
 
   private AbstractTimelineAggregator agg;
-  TestClock clock = new TestClock();
 
   AtomicLong startTimeInDoWork;
   AtomicLong endTimeInDoWork;
@@ -47,7 +43,7 @@ public class AbstractTimelineAggregatorTest {
 
   @Before
   public void setUp() throws Exception {
-    sleepIntervalMillis = 30000l;
+    sleepIntervalMillis = 2*2*30000l; //2 minutes
     checkpointCutOffMultiplier = 2;
 
     Configuration metricsConf = new Configuration();
@@ -59,7 +55,7 @@ public class AbstractTimelineAggregatorTest {
     checkPoint = new AtomicLong(-1);
     actualRuns = 0;
 
-    agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf, clock) {
+    agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) {
       @Override
       public boolean doWork(long startTime, long endTime) {
         startTimeInDoWork.set(startTime);
@@ -81,7 +77,7 @@ public class AbstractTimelineAggregatorTest {
       }
 
       @Override
-      protected Long getSleepIntervalMillis() {
+      public Long getSleepIntervalMillis() {
         return sleepIntervalMillis;
       }
 
@@ -110,167 +106,67 @@ public class AbstractTimelineAggregatorTest {
       }
     };
 
-
   }
 
   @Test
   public void testDoWorkOnZeroDelay() throws Exception {
 
-    // starting at time 0;
-    clock.setTime(0);
+    long currentTime = System.currentTimeMillis();
+    long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(currentTime);
 
-    long sleep = agg.runOnce(sleepIntervalMillis);
+    //Test first run of aggregator with no checkpoint
+    agg.setLastAggregatedEndTime(roundedOffAggregatorTime);
+    agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals(0, checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(roundedOffAggregatorTime, checkPoint.get());
     assertEquals("Do not aggregate on first run", 0, actualRuns);
 
-    // exactly one sleepInterval
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime", clock.getTime() -
-        sleepIntervalMillis,
-      startTimeInDoWork.get());
-    assertEquals("endTime", clock.getTime(),
-      endTimeInDoWork.get());
-    assertEquals(clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(1, actualRuns);
-
-    // exactly one sleepInterval
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime", clock.getTime() -
-        sleepIntervalMillis,
-      startTimeInDoWork.get());
-    assertEquals("endTime", clock.getTime(),
-      endTimeInDoWork.get());
-    assertEquals(clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(2, actualRuns);
-
-    // checkpointCutOffMultiplier x sleepInterval - should pass,
-    // it will aggregate only first part of the whole 2x interval
-    // and sleep as usual (don't we need to skip some sleep?)
-    //
-    // effectively checkpoint will be one interval in the past,
-    // so next run will
-    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
-      sleepIntervalMillis));
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime after 2xinterval", clock.getTime() -
-        (checkpointCutOffMultiplier * sleepIntervalMillis),
-      startTimeInDoWork.get());
-    assertEquals("endTime after 2xinterval", clock.getTime() -
-        sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("checkpoint after 2xinterval", clock.getTime() -
-      sleepIntervalMillis, checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(3, actualRuns);
-
-    // exactly one sleepInterval after one that lagged by one whole interval,
-    // so it will do the previous one... and sleep as usual
-    // no way to keep up
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime ", clock.getTime() -
-        (checkpointCutOffMultiplier * sleepIntervalMillis),
-      startTimeInDoWork.get());
-    assertEquals("endTime  ", clock.getTime() -
-        sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
-      checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(4, actualRuns);
-
-
-    // checkpointCutOffMultiplier x sleepInterval - in normal state should pass,
-    // but the clock lags too much, so this will not execute aggregation
-    // just update checkpoint to currentTime
-    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
-      sleepIntervalMillis));
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals(4, actualRuns);
-    assertEquals("checkpoint after too much lag is reset to " +
-        "current clock time",
-      clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-  }
-
-  @Test
-  public void testDoWorkOnInterruptedRuns() throws Exception {
-    // start at some non-zero arbitrarily selected time;
-    int startingTime = 10000;
-
-    // 1.
-    clock.setTime(startingTime);
-    long timeOfFirstStep = clock.getTime();
-    long sleep = agg.runOnce(sleepIntervalMillis);
+    //Test first run with Too Old checkpoint
+    checkPoint.set(currentTime - 5*60*1000); //Old checkpoint
+    agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals("do not aggregate on first run", 0, actualRuns);
-    assertEquals("first checkpoint set on current time", timeOfFirstStep,
-      checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-
-    // 2.
-    // the doWork was fast, and sleep was interrupted (e.g. restart)
-    // Q: do we want to aggregate just part of the system? maybe we should
-    // sleep up to next cycle start!!
-    clock.setTime(timeOfFirstStep + 1);
-    long timeOfSecondStep = clock.getTime();
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime should be on previous checkpoint since it did not" +
-        " run yet",
-      timeOfFirstStep, startTimeInDoWork.get());
-
-    assertEquals("endTime can be start + interval",
-      startingTime + sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("should aggregate", 1, actualRuns);
-    assertEquals("checkpoint here should be set to min(endTime,currentTime), " +
-        "it is currentTime in our scenario",
-      timeOfSecondStep, checkPoint.get());
-
-    assertEquals(sleep, sleepIntervalMillis);
-
-    //3.
-    // and again not a full sleep passed, so only small part was aggregated
-    clock.setTime(startingTime + 2);
-    long timeOfThirdStep = clock.getTime();
+    assertEquals(roundedOffAggregatorTime, checkPoint.get());
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
 
-    sleep = agg.runOnce(sleepIntervalMillis);
-    // startTime and endTime are both be in the future, makes no sens,
-    // query will not work!!
-    assertEquals("startTime should be previous checkpoint",
-      timeOfSecondStep, startTimeInDoWork.get());
+    //Test first run with too "recent" checkpoint
+    currentTime = System.currentTimeMillis();
+    checkPoint.set(currentTime - 30000);
+    agg.setLastAggregatedEndTime(-1l);
+    agg.setSleepIntervalMillis(sleepIntervalMillis);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get());
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
 
-    assertEquals("endTime  can be start + interval",
-      timeOfSecondStep + sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("should aggregate", 2, actualRuns);
-    assertEquals("checkpoint here should be set to min(endTime,currentTime), " +
-        "it is currentTime in our scenario",
-      timeOfThirdStep,
+    //Test first run with perfect checkpoint (sleepIntervalMillis back)
+    long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
+    long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+    checkPoint.set(checkPointTime);
+    agg.setLastAggregatedEndTime(-1l);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should the lower rounded time of the checkpoint time",
+      expectedCheckPoint, startTimeInDoWork.get());
+    assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+      expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
+    assertEquals(expectedCheckPoint + sleepIntervalMillis,
       checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-
-  }
-
-  private static class TestClock implements Clock {
-
-    private long time;
-
-    public void setTime(long time) {
-      this.time = time;
-    }
+    assertEquals("Aggregate on first run", 1, actualRuns);
+
+    //Test edge case for checkpoint (2 x sleepIntervalMillis)
+    checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis;
+    expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+    checkPoint.set(checkPointTime);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should the lower rounded time of the checkpoint time",
+      expectedCheckPoint, startTimeInDoWork.get());
+    assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+      expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
+    assertEquals(expectedCheckPoint + sleepIntervalMillis,
+      checkPoint.get());
+    assertEquals("Do not aggregate on first run", 2, actualRuns);
 
-    @Override
-    public long getTime() {
-      return time;
-    }
-  }
+ }
 }
\ No newline at end of file