You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/07 19:09:33 UTC

[3/3] hadoop git commit: YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.

YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/daf3e4ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/daf3e4ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/daf3e4ef

Branch: refs/heads/trunk
Commit: daf3e4ef8bf73cbe4a799d51b4765809cd81089f
Parents: 8e991f4
Author: Zhijie Shen <zj...@apache.org>
Authored: Thu May 7 10:01:51 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu May 7 10:01:51 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |    3 +
 .../records/timeline/TimelinePutResponse.java   |    6 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   47 +-
 .../pom.xml                                     |    5 +
 .../yarn/server/timeline/RollingLevelDB.java    |  420 ++++
 .../timeline/RollingLevelDBTimelineStore.java   | 1807 ++++++++++++++++++
 .../server/timeline/TimelineDataManager.java    |   44 +-
 .../yarn/server/timeline/util/LeveldbUtils.java |   73 +-
 .../timeline/TestLeveldbTimelineStore.java      |   29 +-
 .../server/timeline/TestRollingLevelDB.java     |  100 +
 .../TestRollingLevelDBTimelineStore.java        |  427 +++++
 .../server/timeline/TimelineStoreTestUtils.java |   12 +-
 12 files changed, 2907 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 55c65f5..ff7921c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -108,6 +108,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2619. Added NodeManager support for disk io isolation through cgroups.
     (Varun Vasudev and Wei Yan via vinodkv)
 
+    YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
+    (Jonathan Eagles via zjshen)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
index a56d4d4..abe106f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
@@ -129,6 +129,12 @@ public class TimelinePutResponse {
      */
     public static final int FORBIDDEN_RELATION = 6;
 
+    /**
+     * Error code returned if the entity start time is before the eviction
+     * period of old data.
+     */
+    public static final int EXPIRED_ENTITY = 7;
+
     private String entityId;
     private String entityType;
     private int errorCode;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 70b87f3..3bf25ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1431,6 +1431,18 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
       1000 * 60 * 60 * 24 * 7;
 
+  /** Timeline service rolling period. Valid values are daily, half_daily,
+   * quarter_daily, and hourly. */
+  public static final String TIMELINE_SERVICE_ROLLING_PERIOD =
+      TIMELINE_SERVICE_PREFIX + "rolling-period";
+
+  /** Roll a new database each hour. */
+  public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD =
+      "hourly";
+
+  /** Implementation specific configuration prefix for Timeline Service
+   * leveldb.
+   */
   public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
       TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
 
@@ -1438,13 +1450,36 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_LEVELDB_PATH =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
 
-  /** Timeline service leveldb read cache (uncompressed blocks) */
+  /** Timeline service leveldb read cache (uncompressed blocks). This is
+   * per rolling instance so should be tuned if using rolling leveldb
+   * timeline store */
   public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
       TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
 
+  /** Default leveldb read cache size if no configuration is specified. */
   public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
       100 * 1024 * 1024;
 
+  /** Timeline service leveldb write buffer size. */
+  public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size";
+
+  /** Default leveldb write buffer size if no configuration is specified. This
+   * is per rolling instance so should be tuned if using rolling leveldb
+   * timeline store. */
+  public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+      16 * 1024 * 1024;
+
+  /** Timeline service leveldb write batch size. This value can be tuned down
+   * to reduce lock time for ttl eviction. */
+  public static final String
+      TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size";
+
+  /** Default leveldb write batch size is no configuration is specified */
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000;
+
   /** Timeline service leveldb start time read cache (number of entities) */
   public static final String
       TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
@@ -1468,6 +1503,16 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
       1000 * 60 * 5;
 
+  /** Timeline service leveldb number of concurrent open files. Tuned this
+   * configuration to stay within system limits. This is per rolling instance
+   * so should be tuned if using rolling leveldb timeline store. */
+  public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+      TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files";
+
+  /** Default leveldb max open files if no configuration is specified. */
+  public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+      1000;
+
   /** The Kerberos principal for the timeline server.*/
   public static final String TIMELINE_SERVICE_PRINCIPAL =
       TIMELINE_SERVICE_PREFIX + "principal";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 287a45a..d60e21c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -180,6 +180,11 @@
       <artifactId>bcprov-jdk16</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>de.ruedigermoeller</groupId>
+      <artifactId>fst</artifactId>
+      <version>2.24</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
new file mode 100644
index 0000000..6d10671
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Contains the logic to lookup a leveldb by timestamp so that multiple smaller
+ * databases can roll according to the configured period and evicted efficiently
+ * via operating system directory removal.
+ */
+class RollingLevelDB {
+
+  /** Logger for this class. */
+  private static final Log LOG = LogFactory.getLog(RollingLevelDB.class);
+  /** Factory to open and create new leveldb instances. */
+  private static JniDBFactory factory = new JniDBFactory();
+  /** Thread safe date formatter. */
+  private FastDateFormat fdf;
+  /** Date parser. */
+  private SimpleDateFormat sdf;
+  /** Calendar to calculate the current and next rolling period. */
+  private GregorianCalendar cal = new GregorianCalendar(
+      TimeZone.getTimeZone("GMT"));
+  /** Collection of all active rolling leveldb instances. */
+  private final TreeMap<Long, DB> rollingdbs;
+  /** Collection of all rolling leveldb instances to evict. */
+  private final TreeMap<Long, DB> rollingdbsToEvict;
+  /** Name of this rolling level db. */
+  private final String name;
+  /** Calculated timestamp of when to roll a new leveldb instance. */
+  private volatile long nextRollingCheckMillis = 0;
+  /** File system instance to find and create new leveldb instances. */
+  private FileSystem lfs = null;
+  /** Directory to store rolling leveldb instances. */
+  private Path rollingDBPath;
+  /** Configuration for this object. */
+  private Configuration conf;
+  /** Rolling period. */
+  private RollingPeriod rollingPeriod;
+  /**
+   * Rolling leveldb instances are evicted when their endtime is earlier than
+   * the current time minus the time to live value.
+   */
+  private long ttl;
+  /** Whether time to live is enabled. */
+  private boolean ttlEnabled;
+
+  /** Encapsulates the rolling period to date format lookup. */
+  enum RollingPeriod {
+    DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd";
+      }
+    },
+    HALF_DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    QUARTER_DAILY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    HOURLY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH";
+      }
+    },
+    MINUTELY {
+      @Override
+      public String dateFormat() {
+        return "yyyy-MM-dd-HH-mm";
+      }
+    };
+    public abstract String dateFormat();
+  }
+
+  /**
+   * Convenience class for associating a write batch with its rolling leveldb
+   * instance.
+   */
+  public static class RollingWriteBatch {
+    /** Leveldb object. */
+    private final DB db;
+    /** Write batch for the db object. */
+    private final WriteBatch writeBatch;
+
+    public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
+      this.db = db;
+      this.writeBatch = writeBatch;
+    }
+
+    public DB getDB() {
+      return db;
+    }
+
+    public WriteBatch getWriteBatch() {
+      return writeBatch;
+    }
+
+    public void write() {
+      db.write(writeBatch);
+    }
+
+    public void close() {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  RollingLevelDB(String name) {
+    this.name = name;
+    this.rollingdbs = new TreeMap<Long, DB>();
+    this.rollingdbsToEvict = new TreeMap<Long, DB>();
+  }
+
+  protected String getName() {
+    return name;
+  }
+
+  protected long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  public long getNextRollingTimeMillis() {
+    return nextRollingCheckMillis;
+  }
+
+  public long getTimeToLive() {
+    return ttl;
+  }
+
+  public boolean getTimeToLiveEnabled() {
+    return ttlEnabled;
+  }
+
+  protected void setNextRollingTimeMillis(final long timestamp) {
+    this.nextRollingCheckMillis = timestamp;
+    LOG.info("Next rolling time for " + getName() + " is "
+        + fdf.format(nextRollingCheckMillis));
+  }
+
+  public void init(final Configuration config) throws Exception {
+    LOG.info("Initializing RollingLevelDB for " + getName());
+    this.conf = config;
+    this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+    this.ttlEnabled = conf.getBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
+    this.rollingDBPath = new Path(
+        conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+        RollingLevelDBTimelineStore.FILENAME);
+    initFileSystem();
+    initRollingPeriod();
+    initHistoricalDBs();
+  }
+
+  protected void initFileSystem() throws IOException {
+    lfs = FileSystem.getLocal(conf);
+    boolean success = lfs.mkdirs(rollingDBPath,
+        RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
+    if (!success) {
+      throw new IOException("Failed to create leveldb root directory "
+          + rollingDBPath);
+    }
+  }
+
+  protected synchronized void initRollingPeriod() {
+    final String lcRollingPeriod = conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
+    this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
+        .toUpperCase(Locale.ENGLISH));
+    fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
+        TimeZone.getTimeZone("GMT"));
+    sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
+    sdf.setTimeZone(fdf.getTimeZone());
+  }
+
+  protected synchronized void initHistoricalDBs() throws IOException {
+    Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
+    FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
+    for (FileStatus status : statuses) {
+      String dbName = FilenameUtils.getExtension(status.getPath().toString());
+      try {
+        Long dbStartTime = sdf.parse(dbName).getTime();
+        initRollingLevelDB(dbStartTime, status.getPath());
+      } catch (ParseException pe) {
+        LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+            + getName());
+      }
+    }
+  }
+
+  private void initRollingLevelDB(Long dbStartTime,
+      Path rollingInstanceDBPath) {
+    if (rollingdbs.containsKey(dbStartTime)) {
+      return;
+    }
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    options.maxOpenFiles(conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+    options.writeBufferSize(conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+    LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+        + " for start time: " + dbStartTime);
+    DB db = null;
+    try {
+      db = factory.open(
+          new File(rollingInstanceDBPath.toUri().getPath()), options);
+      rollingdbs.put(dbStartTime, db);
+      String dbName = fdf.format(dbStartTime);
+      LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
+    } catch (IOException ioe) {
+      LOG.warn("Failed to open rolling leveldb instance :"
+          + new File(rollingInstanceDBPath.toUri().getPath()), ioe);
+    }
+  }
+
+  synchronized DB getPreviousDB(DB db) {
+    Iterator<DB> iterator = rollingdbs.values().iterator();
+    DB prev = null;
+    while (iterator.hasNext()) {
+      DB cur = iterator.next();
+      if (cur == db) {
+        break;
+      }
+      prev = cur;
+    }
+    return prev;
+  }
+
+  synchronized long getStartTimeFor(DB db) {
+    long startTime = -1;
+    for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
+      if (entry.getValue() == db) {
+        startTime = entry.getKey();
+      }
+    }
+    return startTime;
+  }
+
+  public synchronized DB getDBForStartTime(long startTime) {
+    // make sure we sanitize this input
+    startTime = Math.min(startTime, currentTimeMillis());
+
+    if (startTime >= getNextRollingTimeMillis()) {
+      roll(startTime);
+    }
+    Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
+    if (entry == null) {
+      return null;
+    }
+    return entry.getValue();
+  }
+
+  private void roll(long startTime) {
+    LOG.info("Rolling new DB instance for " + getName());
+    long currentStartTime = computeCurrentCheckMillis(startTime);
+    setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
+    String currentRollingDBInstance = fdf.format(currentStartTime);
+    String currentRollingDBName = getName() + "." + currentRollingDBInstance;
+    Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
+    if (getTimeToLiveEnabled()) {
+      scheduleOldDBsForEviction();
+    }
+    initRollingLevelDB(currentStartTime, currentRollingDBPath);
+  }
+
+  private synchronized void scheduleOldDBsForEviction() {
+    // keep at least time to live amount of data
+    long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
+        - getTimeToLive());
+
+    LOG.info("Scheduling " + getName() + " DBs older than "
+        + fdf.format(evictionThreshold) + " for eviction");
+    Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Long, DB> entry = iterator.next();
+      // parse this in gmt time
+      if (entry.getKey() < evictionThreshold) {
+        LOG.info("Scheduling " + getName() + " eviction for "
+            + fdf.format(entry.getKey()));
+        iterator.remove();
+        rollingdbsToEvict.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  public synchronized void evictOldDBs() {
+    LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
+    Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
+        .iterator();
+    while (iterator.hasNext()) {
+      Entry<Long, DB> entry = iterator.next();
+      IOUtils.cleanup(LOG, entry.getValue());
+      String dbName = fdf.format(entry.getKey());
+      Path path = new Path(rollingDBPath, getName() + "." + dbName);
+      try {
+        LOG.info("Removing old db directory contents in " + path);
+        lfs.delete(path, true);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to evict old db " + path, ioe);
+      }
+      iterator.remove();
+    }
+  }
+
+  public void stop() throws Exception {
+    for (DB db : rollingdbs.values()) {
+      IOUtils.cleanup(LOG, db);
+    }
+    IOUtils.cleanup(LOG, lfs);
+  }
+
+  private long computeNextCheckMillis(long now) {
+    return computeCheckMillis(now, true);
+  }
+
+  public long computeCurrentCheckMillis(long now) {
+    return computeCheckMillis(now, false);
+  }
+
+  private synchronized long computeCheckMillis(long now, boolean next) {
+    // needs to be called synchronously due to shared Calendar
+    cal.setTimeInMillis(now);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+
+    if (rollingPeriod == RollingPeriod.DAILY) {
+      cal.set(Calendar.HOUR_OF_DAY, 0);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.DATE, 1);
+      }
+    } else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
+      // round down to 12 hour interval
+      int hour = (cal.get(Calendar.HOUR) / 12) * 12;
+      cal.set(Calendar.HOUR, hour);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 12);
+      }
+    } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
+      // round down to 6 hour interval
+      int hour = (cal.get(Calendar.HOUR) / 6) * 6;
+      cal.set(Calendar.HOUR, hour);
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 6);
+      }
+    } else if (rollingPeriod == RollingPeriod.HOURLY) {
+      cal.set(Calendar.MINUTE, 0);
+      if (next) {
+        cal.add(Calendar.HOUR_OF_DAY, 1);
+      }
+    } else if (rollingPeriod == RollingPeriod.MINUTELY) {
+      // round down to 5 minute interval
+      int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
+      cal.set(Calendar.MINUTE, minute);
+      if (next) {
+        cal.add(Calendar.MINUTE, 5);
+      }
+    }
+    return cal.getTimeInMillis();
+  }
+}