You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/07 19:09:33 UTC
[3/3] hadoop git commit: YARN-3448. Added a rolling time-to-live
LevelDB timeline store implementation. Contributed by Jonathan Eagles.
YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/daf3e4ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/daf3e4ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/daf3e4ef
Branch: refs/heads/trunk
Commit: daf3e4ef8bf73cbe4a799d51b4765809cd81089f
Parents: 8e991f4
Author: Zhijie Shen <zj...@apache.org>
Authored: Thu May 7 10:01:51 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu May 7 10:01:51 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../records/timeline/TimelinePutResponse.java | 6 +
.../hadoop/yarn/conf/YarnConfiguration.java | 47 +-
.../pom.xml | 5 +
.../yarn/server/timeline/RollingLevelDB.java | 420 ++++
.../timeline/RollingLevelDBTimelineStore.java | 1807 ++++++++++++++++++
.../server/timeline/TimelineDataManager.java | 44 +-
.../yarn/server/timeline/util/LeveldbUtils.java | 73 +-
.../timeline/TestLeveldbTimelineStore.java | 29 +-
.../server/timeline/TestRollingLevelDB.java | 100 +
.../TestRollingLevelDBTimelineStore.java | 427 +++++
.../server/timeline/TimelineStoreTestUtils.java | 12 +-
12 files changed, 2907 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 55c65f5..ff7921c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -108,6 +108,9 @@ Release 2.8.0 - UNRELEASED
YARN-2619. Added NodeManager support for disk io isolation through cgroups.
(Varun Vasudev and Wei Yan via vinodkv)
+ YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
+ (Jonathan Eagles via zjshen)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
index a56d4d4..abe106f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
@@ -129,6 +129,12 @@ public class TimelinePutResponse {
*/
public static final int FORBIDDEN_RELATION = 6;
+ /**
+ * Error code returned if the entity start time is before the eviction
+ * period of old data.
+ */
+ public static final int EXPIRED_ENTITY = 7;
+
private String entityId;
private String entityType;
private int errorCode;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 70b87f3..3bf25ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1431,6 +1431,18 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
1000 * 60 * 60 * 24 * 7;
+ /** Timeline service rolling period. Valid values are daily, half_daily,
+ * quarter_daily, and hourly. */
+ public static final String TIMELINE_SERVICE_ROLLING_PERIOD =
+ TIMELINE_SERVICE_PREFIX + "rolling-period";
+
+ /** Roll a new database each hour. */
+ public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD =
+ "hourly";
+
+ /** Implementation specific configuration prefix for Timeline Service
+ * leveldb.
+ */
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
@@ -1438,13 +1450,36 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
- /** Timeline service leveldb read cache (uncompressed blocks) */
+ /** Timeline service leveldb read cache (uncompressed blocks). This is
+ * per rolling instance so should be tuned if using rolling leveldb
+ * timeline store */
public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
+ /** Default leveldb read cache size if no configuration is specified. */
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
100 * 1024 * 1024;
+ /** Timeline service leveldb write buffer size. */
+ public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size";
+
+ /** Default leveldb write buffer size if no configuration is specified. This
+ * is per rolling instance so should be tuned if using rolling leveldb
+ * timeline store. */
+ public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+ 16 * 1024 * 1024;
+
+ /** Timeline service leveldb write batch size. This value can be tuned down
+ * to reduce lock time for ttl eviction. */
+ public static final String
+ TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size";
+
+ /** Default leveldb write batch size is no configuration is specified */
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000;
+
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
@@ -1468,6 +1503,16 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
1000 * 60 * 5;
+ /** Timeline service leveldb number of concurrent open files. Tuned this
+ * configuration to stay within system limits. This is per rolling instance
+ * so should be tuned if using rolling leveldb timeline store. */
+ public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files";
+
+ /** Default leveldb max open files if no configuration is specified. */
+ public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+ 1000;
+
/** The Kerberos principal for the timeline server.*/
public static final String TIMELINE_SERVICE_PRINCIPAL =
TIMELINE_SERVICE_PREFIX + "principal";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 287a45a..d60e21c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -180,6 +180,11 @@
<artifactId>bcprov-jdk16</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>de.ruedigermoeller</groupId>
+ <artifactId>fst</artifactId>
+ <version>2.24</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
new file mode 100644
index 0000000..6d10671
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Contains the logic to lookup a leveldb by timestamp so that multiple smaller
+ * databases can roll according to the configured period and evicted efficiently
+ * via operating system directory removal.
+ */
+class RollingLevelDB {
+
+ /** Logger for this class. */
+ private static final Log LOG = LogFactory.getLog(RollingLevelDB.class);
+ /** Factory to open and create new leveldb instances. */
+ private static JniDBFactory factory = new JniDBFactory();
+ /** Thread safe date formatter. */
+ private FastDateFormat fdf;
+ /** Date parser. */
+ private SimpleDateFormat sdf;
+ /** Calendar to calculate the current and next rolling period. */
+ private GregorianCalendar cal = new GregorianCalendar(
+ TimeZone.getTimeZone("GMT"));
+ /** Collection of all active rolling leveldb instances. */
+ private final TreeMap<Long, DB> rollingdbs;
+ /** Collection of all rolling leveldb instances to evict. */
+ private final TreeMap<Long, DB> rollingdbsToEvict;
+ /** Name of this rolling level db. */
+ private final String name;
+ /** Calculated timestamp of when to roll a new leveldb instance. */
+ private volatile long nextRollingCheckMillis = 0;
+ /** File system instance to find and create new leveldb instances. */
+ private FileSystem lfs = null;
+ /** Directory to store rolling leveldb instances. */
+ private Path rollingDBPath;
+ /** Configuration for this object. */
+ private Configuration conf;
+ /** Rolling period. */
+ private RollingPeriod rollingPeriod;
+ /**
+ * Rolling leveldb instances are evicted when their endtime is earlier than
+ * the current time minus the time to live value.
+ */
+ private long ttl;
+ /** Whether time to live is enabled. */
+ private boolean ttlEnabled;
+
+ /** Encapsulates the rolling period to date format lookup. */
+ enum RollingPeriod {
+ DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd";
+ }
+ },
+ HALF_DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ QUARTER_DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ HOURLY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ MINUTELY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH-mm";
+ }
+ };
+ public abstract String dateFormat();
+ }
+
+ /**
+ * Convenience class for associating a write batch with its rolling leveldb
+ * instance.
+ */
+ public static class RollingWriteBatch {
+ /** Leveldb object. */
+ private final DB db;
+ /** Write batch for the db object. */
+ private final WriteBatch writeBatch;
+
+ public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
+ this.db = db;
+ this.writeBatch = writeBatch;
+ }
+
+ public DB getDB() {
+ return db;
+ }
+
+ public WriteBatch getWriteBatch() {
+ return writeBatch;
+ }
+
+ public void write() {
+ db.write(writeBatch);
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, writeBatch);
+ }
+ }
+
+ RollingLevelDB(String name) {
+ this.name = name;
+ this.rollingdbs = new TreeMap<Long, DB>();
+ this.rollingdbsToEvict = new TreeMap<Long, DB>();
+ }
+
+ protected String getName() {
+ return name;
+ }
+
+ protected long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ public long getNextRollingTimeMillis() {
+ return nextRollingCheckMillis;
+ }
+
+ public long getTimeToLive() {
+ return ttl;
+ }
+
+ public boolean getTimeToLiveEnabled() {
+ return ttlEnabled;
+ }
+
+ protected void setNextRollingTimeMillis(final long timestamp) {
+ this.nextRollingCheckMillis = timestamp;
+ LOG.info("Next rolling time for " + getName() + " is "
+ + fdf.format(nextRollingCheckMillis));
+ }
+
+ public void init(final Configuration config) throws Exception {
+ LOG.info("Initializing RollingLevelDB for " + getName());
+ this.conf = config;
+ this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+ this.ttlEnabled = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
+ this.rollingDBPath = new Path(
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+ RollingLevelDBTimelineStore.FILENAME);
+ initFileSystem();
+ initRollingPeriod();
+ initHistoricalDBs();
+ }
+
+ protected void initFileSystem() throws IOException {
+ lfs = FileSystem.getLocal(conf);
+ boolean success = lfs.mkdirs(rollingDBPath,
+ RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
+ if (!success) {
+ throw new IOException("Failed to create leveldb root directory "
+ + rollingDBPath);
+ }
+ }
+
+ protected synchronized void initRollingPeriod() {
+ final String lcRollingPeriod = conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
+ this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
+ .toUpperCase(Locale.ENGLISH));
+ fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
+ TimeZone.getTimeZone("GMT"));
+ sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
+ sdf.setTimeZone(fdf.getTimeZone());
+ }
+
+ protected synchronized void initHistoricalDBs() throws IOException {
+ Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
+ FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
+ for (FileStatus status : statuses) {
+ String dbName = FilenameUtils.getExtension(status.getPath().toString());
+ try {
+ Long dbStartTime = sdf.parse(dbName).getTime();
+ initRollingLevelDB(dbStartTime, status.getPath());
+ } catch (ParseException pe) {
+ LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+ + getName());
+ }
+ }
+ }
+
+ private void initRollingLevelDB(Long dbStartTime,
+ Path rollingInstanceDBPath) {
+ if (rollingdbs.containsKey(dbStartTime)) {
+ return;
+ }
+ Options options = new Options();
+ options.createIfMissing(true);
+ options.cacheSize(conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+ options.maxOpenFiles(conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+ options.writeBufferSize(conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+ LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+ + " for start time: " + dbStartTime);
+ DB db = null;
+ try {
+ db = factory.open(
+ new File(rollingInstanceDBPath.toUri().getPath()), options);
+ rollingdbs.put(dbStartTime, db);
+ String dbName = fdf.format(dbStartTime);
+ LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
+ } catch (IOException ioe) {
+ LOG.warn("Failed to open rolling leveldb instance :"
+ + new File(rollingInstanceDBPath.toUri().getPath()), ioe);
+ }
+ }
+
+ synchronized DB getPreviousDB(DB db) {
+ Iterator<DB> iterator = rollingdbs.values().iterator();
+ DB prev = null;
+ while (iterator.hasNext()) {
+ DB cur = iterator.next();
+ if (cur == db) {
+ break;
+ }
+ prev = cur;
+ }
+ return prev;
+ }
+
+ synchronized long getStartTimeFor(DB db) {
+ long startTime = -1;
+ for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
+ if (entry.getValue() == db) {
+ startTime = entry.getKey();
+ }
+ }
+ return startTime;
+ }
+
+ public synchronized DB getDBForStartTime(long startTime) {
+ // make sure we sanitize this input
+ startTime = Math.min(startTime, currentTimeMillis());
+
+ if (startTime >= getNextRollingTimeMillis()) {
+ roll(startTime);
+ }
+ Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ private void roll(long startTime) {
+ LOG.info("Rolling new DB instance for " + getName());
+ long currentStartTime = computeCurrentCheckMillis(startTime);
+ setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
+ String currentRollingDBInstance = fdf.format(currentStartTime);
+ String currentRollingDBName = getName() + "." + currentRollingDBInstance;
+ Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
+ if (getTimeToLiveEnabled()) {
+ scheduleOldDBsForEviction();
+ }
+ initRollingLevelDB(currentStartTime, currentRollingDBPath);
+ }
+
+ private synchronized void scheduleOldDBsForEviction() {
+ // keep at least time to live amount of data
+ long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
+ - getTimeToLive());
+
+ LOG.info("Scheduling " + getName() + " DBs older than "
+ + fdf.format(evictionThreshold) + " for eviction");
+ Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<Long, DB> entry = iterator.next();
+ // parse this in gmt time
+ if (entry.getKey() < evictionThreshold) {
+ LOG.info("Scheduling " + getName() + " eviction for "
+ + fdf.format(entry.getKey()));
+ iterator.remove();
+ rollingdbsToEvict.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public synchronized void evictOldDBs() {
+ LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
+ Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Entry<Long, DB> entry = iterator.next();
+ IOUtils.cleanup(LOG, entry.getValue());
+ String dbName = fdf.format(entry.getKey());
+ Path path = new Path(rollingDBPath, getName() + "." + dbName);
+ try {
+ LOG.info("Removing old db directory contents in " + path);
+ lfs.delete(path, true);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to evict old db " + path, ioe);
+ }
+ iterator.remove();
+ }
+ }
+
+ public void stop() throws Exception {
+ for (DB db : rollingdbs.values()) {
+ IOUtils.cleanup(LOG, db);
+ }
+ IOUtils.cleanup(LOG, lfs);
+ }
+
+ private long computeNextCheckMillis(long now) {
+ return computeCheckMillis(now, true);
+ }
+
+ public long computeCurrentCheckMillis(long now) {
+ return computeCheckMillis(now, false);
+ }
+
+ private synchronized long computeCheckMillis(long now, boolean next) {
+ // needs to be called synchronously due to shared Calendar
+ cal.setTimeInMillis(now);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ if (rollingPeriod == RollingPeriod.DAILY) {
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.DATE, 1);
+ }
+ } else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
+ // round down to 12 hour interval
+ int hour = (cal.get(Calendar.HOUR) / 12) * 12;
+ cal.set(Calendar.HOUR, hour);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 12);
+ }
+ } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
+ // round down to 6 hour interval
+ int hour = (cal.get(Calendar.HOUR) / 6) * 6;
+ cal.set(Calendar.HOUR, hour);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 6);
+ }
+ } else if (rollingPeriod == RollingPeriod.HOURLY) {
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ }
+ } else if (rollingPeriod == RollingPeriod.MINUTELY) {
+ // round down to 5 minute interval
+ int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
+ cal.set(Calendar.MINUTE, minute);
+ if (next) {
+ cal.add(Calendar.MINUTE, 5);
+ }
+ }
+ return cal.getTimeInMillis();
+ }
+}