You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/17 22:03:07 UTC

ambari git commit: AMBARI-8769. Aggregator checkpoint logic should take into account the checkPointDelay.

Repository: ambari
Updated Branches:
  refs/heads/trunk 0dcbcc1ba -> cd3fb17f4


AMBARI-8769. Aggregator checkpoint logic should take into account the checkPointDelay.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cd3fb17f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cd3fb17f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cd3fb17f

Branch: refs/heads/trunk
Commit: cd3fb17f4d770fb354c76f0eb1df738aa66848d5
Parents: 0dcbcc1
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Dec 17 12:53:50 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Dec 17 12:53:50 2014 -0800

----------------------------------------------------------------------
 .../timeline/AbstractTimelineAggregator.java    | 135 ++++++-----
 .../AbstractTimelineAggregatorTest.java         | 222 +++++++++++++++++++
 2 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3fb17f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index a123e57..f169003 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,6 +21,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,18 +39,26 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public abstract class AbstractTimelineAggregator implements Runnable {
   protected final PhoenixHBaseAccessor hBaseAccessor;
   private final Log LOG;
+
+  private Clock clock;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
 
   public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf) {
+    this(hBaseAccessor, metricsConf, new SystemClock());
+  }
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf, Clock clk) {
     this.hBaseAccessor = hBaseAccessor;
     this.metricsConf = metricsConf;
     this.checkpointDelayMillis = SECONDS.toMillis(
       metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
     this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
     this.LOG = LogFactory.getLog(this.getClass());
+    this.clock = clk;
   }
 
   @Override
@@ -57,75 +67,93 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     Long SLEEP_INTERVAL = getSleepIntervalMillis();
 
     while (true) {
-      long currentTime = System.currentTimeMillis();
-      long lastCheckPointTime = -1;
+      long sleepTime = runOnce(SLEEP_INTERVAL);
 
       try {
-        lastCheckPointTime = readCheckPoint();
-        if (isLastCheckPointTooOld(lastCheckPointTime)) {
-          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
-            "lastCheckPointTime = " + lastCheckPointTime);
-          lastCheckPointTime = -1;
-        }
-        if (lastCheckPointTime == -1) {
-          // Assuming first run, save checkpoint and sleep.
-          // Set checkpoint to 2 minutes in the past to allow the
-          // agents/collectors to catch up
-          saveCheckPoint(currentTime - checkpointDelayMillis);
-        }
-      } catch (IOException io) {
-        LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted, continuing with aggregation.");
       }
-      long sleepTime = SLEEP_INTERVAL;
-
-      if (lastCheckPointTime != -1) {
-        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-          + ((System.currentTimeMillis() - lastCheckPointTime) / 1000)
-          + " seconds.");
-
-        long startTime = System.currentTimeMillis();
-        boolean success = doWork(lastCheckPointTime,
-          lastCheckPointTime + SLEEP_INTERVAL);
-        long executionTime = System.currentTimeMillis() - startTime;
-        long delta = SLEEP_INTERVAL - executionTime;
-
-        if (delta > 0) {
-          // Sleep for (configured sleep - time to execute task)
-          sleepTime = delta;
-        } else {
-          // No sleep because last run took too long to execute
-          LOG.info("Aggregator execution took too long, " +
-            "cancelling sleep. executionTime = " + executionTime);
-          sleepTime = 1;
-        }
+    }
+  }
 
-        LOG.debug("Aggregator sleep interval = " + sleepTime);
+  /**
+   * Access relaxed for tests
+   */
+  protected long runOnce(Long SLEEP_INTERVAL) {
+    long currentTime = clock.getTime();
+    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
+    long sleepTime = SLEEP_INTERVAL;
+
+    if (lastCheckPointTime != -1) {
+      LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+        + ((clock.getTime() - lastCheckPointTime) / 1000)
+        + " seconds.");
+
+      long startTime = clock.getTime();
+      boolean success = doWork(lastCheckPointTime,
+        lastCheckPointTime + SLEEP_INTERVAL);
+      long executionTime = clock.getTime() - startTime;
+      long delta = SLEEP_INTERVAL - executionTime;
+
+      if (delta > 0) {
+        // Sleep for (configured sleep - time to execute task)
+        sleepTime = delta;
+      } else {
+        // No sleep because last run took too long to execute
+        LOG.info("Aggregator execution took too long, " +
+          "cancelling sleep. executionTime = " + executionTime);
+        sleepTime = 1;
+      }
 
-        if (success) {
-          try {
-            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
-          } catch (IOException io) {
-            LOG.warn("Error saving checkpoint, restarting aggregation at " +
-              "previous checkpoint.");
-          }
+      LOG.debug("Aggregator sleep interval = " + sleepTime);
+
+      if (success) {
+        try {
+          saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+        } catch (IOException io) {
+          LOG.warn("Error saving checkpoint, restarting aggregation at " +
+            "previous checkpoint.");
         }
       }
+    }
 
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted, continuing with aggregation.");
+    return sleepTime;
+  }
+
+  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+    long lastCheckPointTime = -1;
+
+    try {
+      lastCheckPointTime = readCheckPoint();
+      if (isLastCheckPointTooOld(lastCheckPointTime)) {
+        LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+          "lastCheckPointTime = " + lastCheckPointTime);
+        lastCheckPointTime = -1;
       }
+      if (lastCheckPointTime == -1) {
+        // Assuming first run, save checkpoint and sleep.
+        // Set checkpoint to 2 minutes in the past to allow the
+        // agents/collectors to catch up
+        LOG.info("Saving checkpoint time on first run." +
+          (currentTime - checkpointDelayMillis));
+        saveCheckPoint(currentTime - checkpointDelayMillis);
+      }
+    } catch (IOException io) {
+      LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
     }
+    return lastCheckPointTime;
   }
 
   private boolean isLastCheckPointTooOld(long checkpoint) {
+    // first checkpoint is saved checkpointDelayMillis in the past,
+    // so here we also need to take it into account
     return checkpoint != -1 &&
-      ((System.currentTimeMillis() - checkpoint) >
+      ((clock.getTime() - checkpoint - checkpointDelayMillis) >
         getCheckpointCutOffIntervalMillis());
   }
 
-  private long readCheckPoint() {
+  protected long readCheckPoint() {
     try {
       File checkpoint = new File(getCheckpointLocation());
       if (checkpoint.exists()) {
@@ -140,7 +168,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     return -1;
   }
 
-  private void saveCheckPoint(long checkpointTime) throws IOException {
+  protected void saveCheckPoint(long checkpointTime) throws IOException {
     File checkpoint = new File(getCheckpointLocation());
     if (!checkpoint.exists()) {
       boolean done = checkpoint.createNewFile();
@@ -225,5 +253,4 @@ public abstract class AbstractTimelineAggregator implements Runnable {
   protected abstract boolean isDisabled();
 
   protected abstract String getCheckpointLocation();
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3fb17f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
new file mode 100644
index 0000000..c274c61
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
@@ -0,0 +1,222 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+
+public class AbstractTimelineAggregatorTest {
+
+  private AbstractTimelineAggregator agg;
+  TestClock clock = new TestClock();
+
+  AtomicLong startTimeInDoWork;
+  AtomicLong endTimeInDoWork;
+  AtomicLong checkPoint;
+  int actualRuns;
+
+  long sleepIntervalMillis;
+  int checkpointCutOffMultiplier;
+
+  @Before
+  public void setUp() throws Exception {
+    sleepIntervalMillis = 30000l;
+    checkpointCutOffMultiplier = 2;
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0);
+    metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000);
+
+    startTimeInDoWork = new AtomicLong(0);
+    endTimeInDoWork = new AtomicLong(0);
+    checkPoint = new AtomicLong(-1);
+    actualRuns = 0;
+
+    agg = new AbstractTimelineAggregator(
+      null, metricsConf, clock) {
+      @Override
+      protected boolean doWork(long startTime, long endTime) {
+        startTimeInDoWork.set(startTime);
+        endTimeInDoWork.set(endTime);
+        actualRuns++;
+
+        return true;
+      }
+
+      @Override
+      protected PhoenixTransactSQL.Condition
+      prepareMetricQueryCondition(long startTime, long endTime) {
+        return null;
+      }
+
+      @Override
+      protected void aggregate(ResultSet rs, long startTime,
+                               long endTime) throws IOException, SQLException {
+      }
+
+      @Override
+      protected Long getSleepIntervalMillis() {
+        return sleepIntervalMillis;
+      }
+
+      @Override
+      protected Integer getCheckpointCutOffMultiplier() {
+        return checkpointCutOffMultiplier;
+      }
+
+      @Override
+      protected boolean isDisabled() {
+        return false;
+      }
+
+      @Override
+      protected String getCheckpointLocation() {
+        return "dummy_ckptFile";
+      }
+
+      protected long readCheckPoint() {
+        return checkPoint.get();
+      }
+
+      @Override
+      protected void saveCheckPoint(long checkpointTime) throws IOException {
+        checkPoint.set(checkpointTime);
+      }
+    };
+
+
+  }
+
+  @Test
+  public void testDoWorkOnZeroDelay() throws Exception {
+
+    // starting at time 0;
+    clock.setTime(0);
+
+    long sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    Assert.assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    Assert.assertEquals(0, checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+    Assert.assertEquals("Do not aggregate on first run", 0, actualRuns);
+
+    // exactly one sleepInterval
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals("startTime", clock.getTime() -
+        sleepIntervalMillis,
+      startTimeInDoWork.get());
+    Assert.assertEquals("endTime", clock.getTime(),
+      endTimeInDoWork.get());
+    Assert.assertEquals(clock.getTime(), checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+    Assert.assertEquals(1, actualRuns);
+
+    // exactly one sleepInterval
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals("startTime", clock.getTime() -
+        sleepIntervalMillis,
+      startTimeInDoWork.get());
+    Assert.assertEquals("endTime", clock.getTime(),
+      endTimeInDoWork.get());
+    Assert.assertEquals(clock.getTime(), checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+    Assert.assertEquals(2, actualRuns);
+
+    // checkpointCutOffMultiplier x sleepInterval - should pass,
+    // it will aggregate only first part of the whole 2x interval
+    // and sleep as usual (don't we need to skip some sleep?)
+    //
+    // effectively checkpoint will be one interval in the past,
+    // so next run will
+    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+      sleepIntervalMillis));
+    sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals("startTime after 2xinterval", clock.getTime() -
+        (checkpointCutOffMultiplier * sleepIntervalMillis),
+      startTimeInDoWork.get());
+    Assert.assertEquals("endTime after 2xinterval", clock.getTime() -
+        sleepIntervalMillis,
+      endTimeInDoWork.get());
+    Assert.assertEquals("checkpoint after 2xinterval", clock.getTime() -
+      sleepIntervalMillis, checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+    Assert.assertEquals(3, actualRuns);
+
+    // exactly one sleepInterval after one that lagged by one whole interval,
+    // so it will do the previous one... and sleep as usual
+    // no way to keep up
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals("startTime ", clock.getTime() -
+        (checkpointCutOffMultiplier * sleepIntervalMillis),
+      startTimeInDoWork.get());
+    Assert.assertEquals("endTime  ", clock.getTime() -
+        sleepIntervalMillis,
+      endTimeInDoWork.get());
+    Assert.assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
+      checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+    Assert.assertEquals(4, actualRuns);
+
+
+    // checkpointCutOffMultiplier x sleepInterval - in normal state should pass,
+    // but the clock lags too much, so this will not execute aggregation
+    // just update checkpoint to currentTime
+    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+      sleepIntervalMillis));
+    sleep = agg.runOnce(sleepIntervalMillis);
+    Assert.assertEquals(4, actualRuns);
+    Assert.assertEquals("checkpoint after too much lag is reset to " +
+        "current clock time",
+      clock.getTime(), checkPoint.get());
+    Assert.assertEquals(sleep, sleepIntervalMillis);
+
+
+  }
+
+  //testDoWorkOnInterruptedruns
+// 1. On interrupted it can skip some metrics
+//  testOnInterruption:
+//    // if sleep is interrupted.. is it ok?
+//    clock.setTime(10000);
+//  sleep = agg.runOnce(sleepIntervalMillis);
+//  Assert.assertEquals("startTime should be zero", 0,
+//    startTimeInDoWork.get());
+//  Assert.assertEquals("endTime  should be zero", 0, endTimeInDoWork.get()
+//    + sleepIntervalMillis);
+//
+//  //if it is interrupted again:
+//  clock.setTime(30000);
+//  sleep = agg.runOnce(sleepIntervalMillis);
+  //
+  // 2. if it lags it can skip??
+  //
+
+  private static class TestClock implements Clock {
+
+    private long time;
+
+    public void setTime(long time) {
+      this.time = time;
+    }
+
+    @Override
+    public long getTime() {
+      return time;
+    }
+  }
+}
\ No newline at end of file