You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/08 12:13:24 UTC
svn commit: r1298357 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/test/java/org/apache/bookk...
Author: ivank
Date: Thu Mar 8 11:13:23 2012
New Revision: 1298357
URL: http://svn.apache.org/viewvc?rev=1298357&view=rev
Log:
BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim disk space (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar 8 11:13:23 2012
@@ -76,6 +76,8 @@ Trunk (unreleased changes)
BOOKKEEPER-178: Delay ledger directory creation until the ledger index file was created (sijie via ivank)
+ BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim disk space (sijie via ivank)
+
hedwig-server/
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Thu Mar 8 11:13:23 2012
@@ -52,6 +52,28 @@ ledgerDirectories=/tmp/bk-data
# A new entry log file will be created when the old one reaches the file size limitation
# logSizeLimit=2147483648
+# Threshold of minor compaction
+# For those entry log files whose remaining size percentage reaches below
+# this threshold will be compacted in a minor compaction.
+# If it is set to less than zero, the minor compaction is disabled.
+# minorCompactionThreshold=0.2
+
+# Interval to run minor compaction, in seconds
+# If it is set to less than zero, the minor compaction is disabled.
+# minorCompactionInterval=3600
+
+# Threshold of major compaction
+# For those entry log files whose remaining size percentage reaches below
+# this threshold will be compacted in a major compaction.
+# Those entry log files whose remaining size percentage is still
+# higher than the threshold will never be compacted.
+# If it is set to less than zero, the minor compaction is disabled.
+# majorCompactionThreshold=0.8
+
+# Interval to run major compaction, in seconds
+# If it is set to less than zero, the major compaction is disabled.
+# majorCompactionInterval=86400
+
# Max file size of journal file, in mega bytes
# A new journal file will be created when the old one reaches the file size limitation
#
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Mar 8 11:13:23 2012
@@ -136,6 +136,11 @@ public class Bookie extends Thread {
EntryLogger entryLogger;
LedgerCache ledgerCache;
+ // This is the thread that garbage collects the entry logs that do not
+ // contain any active ledgers in them; and compacts the entry logs that
+ // has lower remaining percentage to reclaim disk space.
+ final GarbageCollectorThread gcThread;
+
/**
* SyncThread is a background thread which flushes ledger index pages periodically.
* Also it takes responsibility of garbage collecting journal files.
@@ -263,6 +268,28 @@ public class Bookie extends Thread {
}
}
+ /**
+ * Scanner used to do entry log compaction
+ */
+ class EntryLogCompactionScanner implements EntryLogger.EntryLogScanner {
+ @Override
+ public boolean accept(long ledgerId) {
+ // bookie has no knowledge about which ledger is deleted
+ // so just accept all ledgers.
+ return true;
+ }
+
+ @Override
+ public void process(long ledgerId, ByteBuffer buffer)
+ throws IOException {
+ try {
+ Bookie.this.addEntryByLedgerId(ledgerId, buffer);
+ } catch (BookieException be) {
+ throw new IOException(be);
+ }
+ }
+ }
+
public Bookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
@@ -283,9 +310,11 @@ public class Bookie extends Thread {
ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
syncThread = new SyncThread(conf);
- entryLogger = new EntryLogger(conf, this);
+ entryLogger = new EntryLogger(conf);
ledgerCache = new LedgerCache(conf, ledgerManager);
-
+ gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
+ new EntryLogCompactionScanner());
+ // replay journals
readJournal();
}
@@ -390,7 +419,7 @@ public class Bookie extends Thread {
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
super.start();
syncThread.start();
- entryLogger.start();
+ gcThread.start();
// set running here.
// since bookie server use running as a flag to tell bookie server whether it is alive
// if setting it in bookie thread, the watcher might run before bookie thread.
@@ -907,6 +936,9 @@ public class Bookie extends Thread {
if (!running) { // avoid shutdown twice
return;
}
+ // shut down gc thread, which depends on zookeeper client
+ // also compaction will write entries again to entry log file
+ gcThread.shutdown();
// Shutdown the ZK client
if(zk != null) zk.close();
this.interrupt();
@@ -957,6 +989,16 @@ public class Bookie extends Thread {
return l;
}
+ protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
+ throws IOException, BookieException {
+ LedgerDescriptor handle = getHandle(ledgerId);
+ try {
+ handle.addEntry(entry);
+ } finally {
+ putHandle(handle);
+ }
+ }
+
/**
* Add an entry to a ledger as specified by handle.
*/
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Thu Mar 8 11:13:23 2012
@@ -36,13 +36,13 @@ import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
/**
* This class manages the writing of the bookkeeper entries. All the new
@@ -54,9 +54,6 @@ import org.apache.bookkeeper.meta.Ledger
public class EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
private File dirs[];
- // This is a handle to the Bookie parent instance. We need this to get
- // access to the LedgerCache as well as the ZooKeeper client handle.
- private final Bookie bookie;
private long logId;
/**
@@ -74,24 +71,102 @@ public class EntryLogger {
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
- // Maps entry log files to the set of ledgers that comprise the file.
- private ConcurrentMap<Long, ConcurrentHashMap<Long, Boolean>> entryLogs2LedgersMap = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>>();
- // This is the thread that garbage collects the entry logs that do not
- // contain any active ledgers in them.
- GarbageCollectorThread gcThread = new GarbageCollectorThread();
- // This is how often we want to run the Garbage Collector Thread (in milliseconds).
- final long gcWaitTime;
+ final static long MB = 1024 * 1024;
+
+ /**
+ * Records the total size, remaining size and the set of ledgers that comprise a entry log.
+ */
+ static class EntryLogMetadata {
+ long entryLogId;
+ long totalSize;
+ long remainingSize;
+ ConcurrentHashMap<Long, Long> ledgersMap;
+
+ public EntryLogMetadata(long logId) {
+ this.entryLogId = logId;
+
+ totalSize = remainingSize = 0;
+ ledgersMap = new ConcurrentHashMap<Long, Long>();
+ }
+
+ public void addLedgerSize(long ledgerId, long size) {
+ totalSize += size;
+ remainingSize += size;
+ Long ledgerSize = ledgersMap.get(ledgerId);
+ if (null == ledgerSize) {
+ ledgerSize = 0L;
+ }
+ ledgerSize += size;
+ ledgersMap.put(ledgerId, ledgerSize);
+ }
+
+ public void removeLedger(long ledgerId) {
+ Long size = ledgersMap.remove(ledgerId);
+ if (null == size) {
+ return;
+ }
+ remainingSize -= size;
+ }
+
+ public boolean containsLedger(long ledgerId) {
+ return ledgersMap.containsKey(ledgerId);
+ }
+
+ public double getUsage() {
+ if (totalSize == 0L) {
+ return 0.0f;
+ }
+ return (double)remainingSize / totalSize;
+ }
+
+ public boolean isEmpty() {
+ return ledgersMap.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ")
+ .append(remainingSize).append(", ledgersMap = ").append(ledgersMap).append(" }");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Scan entries in a entry log file.
+ */
+ static interface EntryLogScanner {
+ /**
+ * Tests whether or not the entries belongs to the specified ledger
+ * should be processed.
+ *
+ * @param ledgerId
+ * Ledger ID.
+ * @return true if and only the entries of the ledger should be scanned.
+ */
+ public boolean accept(long ledgerId);
+
+ /**
+ * Process an entry.
+ *
+ * @param ledgerId
+ * Ledger ID.
+ * @param entry
+ * Entry ByteBuffer
+ * @throws IOException
+ */
+ public void process(long ledgerId, ByteBuffer entry) throws IOException;
+ }
/**
* Create an EntryLogger that stores it's log files in the given
* directories
*/
- public EntryLogger(ServerConfiguration conf, Bookie bookie) throws IOException {
+ public EntryLogger(ServerConfiguration conf) throws IOException {
this.dirs = conf.getLedgerDirs();
- this.bookie = bookie;
// log size limit
this.logSizeLimit = conf.getEntryLogSizeLimit();
- this.gcWaitTime = conf.getGcWaitTime();
+
// Initialize the entry log header buffer. This cannot be a static object
// since in our unit tests, we run multiple Bookies and thus EntryLoggers
// within the same JVM. All of these Bookie instances access this header
@@ -108,119 +183,12 @@ public class EntryLogger {
createLogId(logId);
}
- public void start() {
- // Start the Garbage Collector thread to prune unneeded entry logs.
- gcThread.start();
- }
-
/**
* Maps entry log files to open channels.
*/
private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
/**
- * This is the garbage collector thread that runs in the background to
- * remove any entry log files that no longer contains any active ledger.
- */
- class GarbageCollectorThread extends Thread {
- volatile boolean running = true;
-
- public GarbageCollectorThread() {
- super("GarbageCollectorThread");
- }
-
- @Override
- public void run() {
- while (running) {
- synchronized (this) {
- try {
- wait(gcWaitTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
- }
- // Extract all of the ledger ID's that comprise all of the entry logs
- // (except for the current new one which is still being written to).
- try {
- extractLedgersFromEntryLogs();
- } catch (IOException ie) {
- LOG.warn("Exception when extracting ledgers from entry logs : ", ie);
- }
-
- // Initialization check. No need to run any logic if we are still starting up.
- if (bookie == null ||
- bookie.zk == null || bookie.ledgerCache == null) {
- continue;
- }
-
- // gc inactive/deleted ledgers
- doGcLedgers();
-
- // gc entry logs
- doGcEntryLogs();
- }
- }
-
- /**
- * Do garbage collection ledger index files
- */
- private void doGcLedgers() {
- bookie.ledgerCache.activeLedgerManager.garbageCollectLedgers(
- new LedgerManager.GarbageCollector() {
- @Override
- public void gc(long ledgerId) {
- try {
- bookie.ledgerCache.deleteLedger(ledgerId);
- } catch (IOException e) {
- LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
- }
- }
- });
- }
-
- /**
- * Garbage collect those entry loggers which are not associated with any active ledgers
- */
- private void doGcEntryLogs() {
- // Loop through all of the entry logs and remove the non-active ledgers.
- for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
- ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
- for (Long entryLogLedger : entryLogLedgers.keySet()) {
- // Remove the entry log ledger from the set if it isn't active.
- if (!bookie.ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
- entryLogLedgers.remove(entryLogLedger);
- }
- }
- if (entryLogLedgers.isEmpty()) {
- // This means the entry log is not associated with any active ledgers anymore.
- // We can remove this entry log file now.
- LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
- BufferedChannel bc = channels.remove(entryLogId);
- if (null != bc) {
- // close its underlying file channel, so it could be deleted really
- try {
- bc.getFileChannel().close();
- } catch (IOException ie) {
- LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
- }
- }
- File entryLogFile;
- try {
- entryLogFile = findFile(entryLogId);
- } catch (FileNotFoundException e) {
- LOG.error("Trying to delete an entryLog file that could not be found: "
- + entryLogId + ".log");
- continue;
- }
- entryLogFile.delete();
- entryLogs2LedgersMap.remove(entryLogId);
- }
- }
- }
- }
-
- /**
* Creates a new log file with the given id.
*/
private void createLogId(long logId) throws IOException {
@@ -239,6 +207,34 @@ public class EntryLogger {
}
/**
+ * Remove entry log.
+ *
+ * @param entryLogId
+ * Entry Log File Id
+ */
+ protected boolean removeEntryLog(long entryLogId) {
+ BufferedChannel bc = channels.remove(entryLogId);
+ if (null != bc) {
+ // close its underlying file channel, so it could be deleted really
+ try {
+ bc.getFileChannel().close();
+ } catch (IOException ie) {
+ LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
+ }
+ }
+ File entryLogFile;
+ try {
+ entryLogFile = findFile(entryLogId);
+ } catch (FileNotFoundException e) {
+ LOG.error("Trying to delete an entryLog file that could not be found: "
+ + entryLogId + ".log");
+ return false;
+ }
+ entryLogFile.delete();
+ return true;
+ }
+
+ /**
* writes the given id to the "lastId" file in the given directory.
*/
private void setLastLogId(File dir, long logId) throws IOException {
@@ -326,7 +322,7 @@ public class EntryLogger {
sizeBuff.flip();
int entrySize = sizeBuff.getInt();
// entrySize does not include the ledgerId
- if (entrySize > 1024*1024) {
+ if (entrySize > MB) {
LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
}
@@ -390,84 +386,147 @@ public class EntryLogger {
}
/**
+ * A scanner used to extract entry log meta from entry log files.
+ */
+ class ExtractionScanner implements EntryLogScanner {
+ EntryLogMetadata meta;
+
+ public ExtractionScanner(EntryLogMetadata meta) {
+ this.meta = meta;
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return true;
+ }
+ @Override
+ public void process(long ledgerId, ByteBuffer entry) {
+ // add new entry size of a ledger to entry log meta
+ meta.addLedgerSize(ledgerId, entry.limit() + 4);
+ }
+ }
+
+ /**
* Method to read in all of the entry logs (those that we haven't done so yet),
* and find the set of ledger ID's that make up each entry log file.
+ *
+ * @param entryLogMetaMap
+ * Existing EntryLogs to Meta
+ * @throws IOException
*/
- private void extractLedgersFromEntryLogs() throws IOException {
+ protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) throws IOException {
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
- ByteBuffer sizeBuff = ByteBuffer.allocate(4);
- BufferedChannel bc;
long curLogId = logId;
for (long entryLogId = 0; entryLogId < curLogId; entryLogId++) {
// Comb the current entry log file if it has not already been extracted.
- if (entryLogs2LedgersMap.containsKey(entryLogId)) {
+ if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
}
- LOG.info("Extracting the ledgers from entryLogId: " + entryLogId);
- // Get the BufferedChannel for the current entry log file
+ LOG.info("Extracting entry log meta from entryLogId: " + entryLogId);
+ EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
+ ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
+ // Read through the entry log file and extract the entry log meta
try {
- bc = getChannelForLogId(entryLogId);
- } catch (FileNotFoundException e) {
- // If we can't find the entry log file, just log a warning message and continue.
- // This could be a deleted/garbage collected entry log.
- LOG.warn("Entry Log file not found in log directories: " + entryLogId + ".log");
- continue;
- }
- // Start the read position in the current entry log file to be after
- // the header where all of the ledger entries are.
- long pos = LOGFILE_HEADER_SIZE;
- ConcurrentHashMap<Long, Boolean> entryLogLedgers = new ConcurrentHashMap<Long, Boolean>();
- // Read through the entry log file and extract the ledger ID's.
- try {
- while (true) {
- // Check if we've finished reading the entry log file.
- if (pos >= bc.size()) {
- break;
- }
- if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
- throw new IOException("Short read from entrylog " + entryLogId);
- }
- pos += 4;
- sizeBuff.flip();
- int entrySize = sizeBuff.getInt();
- if (entrySize > 1024 * 1024) {
- LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
- + entryLogId);
- }
- byte data[] = new byte[entrySize];
- ByteBuffer buff = ByteBuffer.wrap(data);
- int rc = bc.read(buff, pos);
- if (rc != data.length) {
- throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!="
- + data.length + ")");
- }
- buff.flip();
- long ledgerId = buff.getLong();
- entryLogLedgers.put(ledgerId, true);
- // Advance position to the next entry and clear sizeBuff.
- pos += entrySize;
- sizeBuff.clear();
- }
+ scanEntryLog(entryLogId, scanner);
+ LOG.info("Retrieved entry log meta data entryLogId: " + entryLogId + ", meta: " + entryLogMeta);
+ entryLogMetaMap.put(entryLogId, entryLogMeta);
} catch(IOException e) {
- LOG.info("Premature exception when processing " + entryLogId +
+ LOG.warn("Premature exception when processing " + entryLogId +
"recovery will take care of the problem", e);
}
- LOG.info("Retrieved all ledgers that comprise entryLogId: " + entryLogId + ", values: " + entryLogLedgers);
- entryLogs2LedgersMap.put(entryLogId, entryLogLedgers);
+
}
+ return entryLogMetaMap;
+ }
+
+ protected EntryLogMetadata extractMetaFromEntryLog(long entryLogId) {
+ EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
+ ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
+ // Read through the entry log file and extract the entry log meta
+ try {
+ scanEntryLog(entryLogId, scanner);
+ LOG.info("Retrieved entry log meta data entryLogId: " + entryLogId + ", meta: " + entryLogMeta);
+ } catch(IOException e) {
+ LOG.warn("Premature exception when processing " + entryLogId +
+ "recovery will take care of the problem", e);
+ }
+ return entryLogMeta;
}
/**
- * Shutdown method to gracefully stop all threads spawned in this class and exit.
+ * Scan entry log
*
- * @throws InterruptedException if there is an exception stopping threads.
+ * @param entryLogId
+ * Entry Log Id
+ * @param scanner
+ * Entry Log Scanner
+ * @throws IOException
+ */
+ protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
+ ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ ByteBuffer lidBuff = ByteBuffer.allocate(8);
+ BufferedChannel bc;
+ // Get the BufferedChannel for the current entry log file
+ try {
+ bc = getChannelForLogId(entryLogId);
+ } catch (IOException e) {
+ LOG.warn("Failed to get channel to scan entry log: " + entryLogId + ".log");
+ throw e;
+ }
+ // Start the read position in the current entry log file to be after
+ // the header where all of the ledger entries are.
+ long pos = LOGFILE_HEADER_SIZE;
+ // Read through the entry log file and extract the ledger ID's.
+ while (true) {
+ // Check if we've finished reading the entry log file.
+ if (pos >= bc.size()) {
+ break;
+ }
+ if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ throw new IOException("Short read for entry size from entrylog " + entryLogId);
+ }
+ pos += 4;
+ sizeBuff.flip();
+ int entrySize = sizeBuff.getInt();
+ if (entrySize > MB) {
+ LOG.warn("Found large size entry of " + entrySize + " at location " + pos + " in "
+ + entryLogId);
+ }
+ sizeBuff.clear();
+ // try to read ledger id first
+ if (bc.read(lidBuff, pos) != lidBuff.capacity()) {
+ throw new IOException("Short read for ledger id from entrylog " + entryLogId);
+ }
+ lidBuff.flip();
+ long lid = lidBuff.getLong();
+ lidBuff.clear();
+ if (!scanner.accept(lid)) {
+ // skip this entry
+ pos += entrySize;
+ continue;
+ }
+ // read the entry
+ byte data[] = new byte[entrySize];
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int rc = bc.read(buff, pos);
+ if (rc != data.length) {
+ throw new IOException("Short read for ledger entry from entryLog " + entryLogId
+ + "@" + pos + "(" + rc + "!=" + data.length + ")");
+ }
+ buff.flip();
+ // process the entry
+ scanner.process(lid, buff);
+ // Advance position to the next entry
+ pos += entrySize;
+ }
+ }
+
+ /**
+ * Shutdown method to gracefully stop entry logger.
*/
- public void shutdown() throws InterruptedException {
- gcThread.running = false;
- gcThread.interrupt();
- gcThread.join();
+ public void shutdown() {
// since logChannel is buffered channel, do flush when shutting down
try {
flush();
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1298357&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Thu Mar 8 11:13:23 2012
@@ -0,0 +1,378 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is the garbage collector thread that runs in the background to
+ * remove any entry log files that no longer contains any active ledger.
+ */
+public class GarbageCollectorThread extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
+
+ private static final int SECOND = 1000;
+
+ // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
+ private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long, EntryLogMetadata>();
+
+ // This is how often we want to run the Garbage Collector Thread (in milliseconds).
+ final long gcWaitTime;
+
+ // Compaction parameters
+ boolean enableMinorCompaction = false;
+ final double minorCompactionThreshold;
+ final long minorCompactionInterval;
+
+ boolean enableMajorCompaction = false;
+ final double majorCompactionThreshold;
+ final long majorCompactionInterval;
+
+ long lastMinorCompactionTime;
+ long lastMajorCompactionTime;
+
+ // Entry Logger Handle
+ final EntryLogger entryLogger;
+ final EntryLogScanner scanner;
+
+ // Ledger Cache Handle
+ final LedgerCache ledgerCache;
+
+ // ZooKeeper Client
+ final ZooKeeper zk;
+
+ // flag to ensure gc thread will not be interrupted during compaction
+ // to reduce the risk getting entry log corrupted
+ final AtomicBoolean compacting = new AtomicBoolean(false);
+
+ volatile boolean running = true;
+
+ /**
+ * A scanner wrapper to check whether a ledger is alive in an entry log file
+ */
+ class CompactionScanner implements EntryLogScanner {
+ EntryLogMetadata meta;
+
+ public CompactionScanner(EntryLogMetadata meta) {
+ this.meta = meta;
+ }
+
+ @Override
+ public boolean accept(long ledgerId) {
+ return meta.containsLedger(ledgerId) && scanner.accept(ledgerId);
+ }
+
+ @Override
+ public void process(long ledgerId, ByteBuffer entry) throws IOException {
+ scanner.process(ledgerId, entry);
+ }
+ }
+
+
+ /**
+ * Create a garbage collector thread.
+ *
+ * @param conf
+ * Server Configuration Object.
+ * @throws IOException
+ */
+ public GarbageCollectorThread(ServerConfiguration conf,
+ ZooKeeper zookeeper,
+ LedgerCache ledgerCache,
+ EntryLogger entryLogger,
+ EntryLogScanner scanner)
+ throws IOException {
+ super("GarbageCollectorThread");
+
+ this.zk = zookeeper;
+ this.ledgerCache = ledgerCache;
+ this.entryLogger = entryLogger;
+ this.scanner = scanner;
+
+ this.gcWaitTime = conf.getGcWaitTime();
+ // compaction parameters
+ minorCompactionThreshold = conf.getMinorCompactionThreshold();
+ minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
+ majorCompactionThreshold = conf.getMajorCompactionThreshold();
+ majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
+
+ if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
+ if (minorCompactionThreshold > 1.0f) {
+ throw new IOException("Invalid minor compaction threshold "
+ + minorCompactionThreshold);
+ }
+ if (minorCompactionInterval <= gcWaitTime) {
+ throw new IOException("Too short minor compaction interval : "
+ + minorCompactionInterval);
+ }
+ enableMinorCompaction = true;
+ }
+
+ if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
+ if (majorCompactionThreshold > 1.0f) {
+ throw new IOException("Invalid major compaction threshold "
+ + majorCompactionThreshold);
+ }
+ if (majorCompactionInterval <= gcWaitTime) {
+ throw new IOException("Too short major compaction interval : "
+ + majorCompactionInterval);
+ }
+ enableMajorCompaction = true;
+ }
+
+ if (enableMinorCompaction && enableMajorCompaction) {
+ if (minorCompactionInterval >= majorCompactionInterval ||
+ minorCompactionThreshold >= majorCompactionThreshold) {
+ throw new IOException("Invalid minor/major compaction settings : minor ("
+ + minorCompactionThreshold + ", " + minorCompactionInterval
+ + "), major (" + majorCompactionThreshold + ", "
+ + majorCompactionInterval + ")");
+ }
+ }
+
+ LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
+ + minorCompactionThreshold + ", interval=" + minorCompactionInterval);
+ LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
+ + majorCompactionThreshold + ", interval=" + majorCompactionInterval);
+
+ lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ synchronized (this) {
+ try {
+ wait(gcWaitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+
+ // Dependency check.
+ if (null == zk) {
+ continue;
+ }
+
+ // Extract all of the ledger ID's that comprise all of the entry logs
+ // (except for the current new one which is still being written to).
+ try {
+ entryLogMetaMap = entryLogger.extractMetaFromEntryLogs(entryLogMetaMap);
+ } catch (IOException ie) {
+ LOG.warn("Exception when extracting entry log meta from entry logs : ", ie);
+ }
+
+ // gc inactive/deleted ledgers
+ doGcLedgers();
+
+ // gc entry logs
+ doGcEntryLogs();
+
+ long curTime = System.currentTimeMillis();
+ if (enableMajorCompaction &&
+ curTime - lastMajorCompactionTime > majorCompactionInterval) {
+ // enter major compaction
+ LOG.info("Enter major compaction");
+ doCompactEntryLogs(majorCompactionThreshold);
+ lastMajorCompactionTime = System.currentTimeMillis();
+ // also move minor compaction time
+ lastMinorCompactionTime = lastMajorCompactionTime;
+ continue;
+ }
+
+ if (enableMinorCompaction &&
+ curTime - lastMinorCompactionTime > minorCompactionInterval) {
+ // enter minor compaction
+ LOG.info("Enter minor compaction");
+ doCompactEntryLogs(minorCompactionThreshold);
+ lastMinorCompactionTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * Do garbage collection ledger index files
+ */
+ private void doGcLedgers() {
+ ledgerCache.activeLedgerManager.garbageCollectLedgers(
+ new LedgerManager.GarbageCollector() {
+ @Override
+ public void gc(long ledgerId) {
+ try {
+ ledgerCache.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Garbage collect those entry loggers which are not associated with any active ledgers
+ */
+ private void doGcEntryLogs() {
+ // Loop through all of the entry logs and remove the non-active ledgers.
+ for (Long entryLogId : entryLogMetaMap.keySet()) {
+ EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
+ for (Long entryLogLedger : meta.ledgersMap.keySet()) {
+ // Remove the entry log ledger from the set if it isn't active.
+ if (!ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+ meta.removeLedger(entryLogLedger);
+ }
+ }
+ if (meta.isEmpty()) {
+ // This means the entry log is not associated with any active ledgers anymore.
+ // We can remove this entry log file now.
+ LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
+ removeEntryLog(entryLogId);
+ }
+ }
+ }
+
+ /**
+ * Compact entry logs if necessary.
+ *
+ * <p>
+ * Compaction will be executed from low unused space to high unused space.
+ * Those entry log files whose remaining size percentage is higher than threshold
+ * would not be compacted.
+ * </p>
+ */
+ private void doCompactEntryLogs(double threshold) {
+ LOG.info("Do compaction to compact those files lower than " + threshold);
+ // sort the ledger meta by occupied unused space
+ Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>() {
+ @Override
+ public int compare(EntryLogMetadata m1, EntryLogMetadata m2) {
+ long unusedSize1 = m1.totalSize - m1.remainingSize;
+ long unusedSize2 = m2.totalSize - m2.remainingSize;
+ if (unusedSize1 > unusedSize2) {
+ return -1;
+ } else if (unusedSize1 < unusedSize2) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ };
+ List<EntryLogMetadata> logsToCompact = new ArrayList();
+ logsToCompact.addAll(entryLogMetaMap.values());
+ Collections.sort(logsToCompact, sizeComparator);
+ for (EntryLogMetadata meta : logsToCompact) {
+ if (meta.getUsage() >= threshold) {
+ break;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting entry log " + meta.entryLogId + " below threshold "
+ + threshold + ".");
+ }
+ compactEntryLog(meta.entryLogId);
+ if (!running) { // if gc thread is not running, stop compaction
+ return;
+ }
+ }
+ }
+
+ /**
+ * Shutdown the garbage collector thread.
+ *
+ * @throws InterruptedException if there is an exception stopping gc thread.
+ */
+ public void shutdown() throws InterruptedException {
+ this.running = false;
+ if (compacting.compareAndSet(false, true)) {
+ // if setting compacting flag succeed, means gcThread is not compacting now
+ // it is safe to interrupt itself now
+ this.interrupt();
+ }
+ this.join();
+ }
+
+ /**
+ * Remove entry log.
+ *
+ * @param entryLogId
+ * Entry Log File Id
+ */
+ private void removeEntryLog(long entryLogId) {
+ // remove entry log file successfully
+ if (entryLogger.removeEntryLog(entryLogId)) {
+ entryLogMetaMap.remove(entryLogId);
+ }
+ }
+
+ /**
+ * Compact an entry log.
+ *
+ * @param entryLogId
+ * Entry Log File Id
+ */
+ protected void compactEntryLog(long entryLogId) {
+ EntryLogMetadata entryLogMeta = entryLogMetaMap.get(entryLogId);
+ if (null == entryLogMeta) {
+ LOG.warn("Can't get entry log meta when compacting entry log " + entryLogId + ".");
+ return;
+ }
+
+ // Similar with Sync Thread
+ // try to mark compacting flag to make sure it would not be interrupted
+ // by shutdown during compaction. otherwise it will receive
+ // ClosedByInterruptException which may cause index file & entry logger
+ // closed and corrupted.
+ if (!compacting.compareAndSet(false, true)) {
+ // set compacting flag failed, means compacting is true now
+ // indicates another thread wants to interrupt gc thread to exit
+ return;
+ }
+
+ LOG.info("Compacting entry log : " + entryLogId);
+
+ try {
+ entryLogger.scanEntryLog(entryLogId, new CompactionScanner(entryLogMeta));
+ // after moving entries to new entry log, remove this old one
+ removeEntryLog(entryLogId);
+ } catch (IOException e) {
+ LOG.info("Premature exception when compacting " + entryLogId, e);
+ } finally {
+ // clear compacting flag
+ compacting.set(false);
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Thu Mar 8 11:13:23 2012
@@ -25,6 +25,10 @@ import java.io.File;
public class ServerConfiguration extends AbstractConfiguration {
// Entry Log Parameters
protected final static String ENTRY_LOG_SIZE_LIMIT = "logSizeLimit";
+ protected final static String MINOR_COMPACTION_INTERVAL = "minorCompactionInterval";
+ protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
+ protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
+ protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
// Gc Parameters
protected final static String GC_WAIT_TIME = "gcWaitTime";
@@ -405,4 +409,110 @@ public class ServerConfiguration extends
setProperty(ENABLE_STATISTICS, Boolean.toString(enabled));
return this;
}
+
+ /**
+ * Get threshold of minor compaction.
+ *
+ * For those entry log files whose remaining size percentage reaches below
+ * this threshold will be compacted in a minor compaction.
+ *
+ * If it is set to less than zero, the minor compaction is disabled.
+ *
+ * @return threshold of minor compaction
+ */
+ public double getMinorCompactionThreshold() {
+ return getDouble(MINOR_COMPACTION_THRESHOLD, 0.2f);
+ }
+
+ /**
+ * Set threshold of minor compaction
+ *
+ * @see #getMinorCompactionThreshold()
+ *
+ * @param threshold
+ * Threshold for minor compaction
+ * @return server configuration
+ */
+ public ServerConfiguration setMinorCompactionThreshold(double threshold) {
+ setProperty(MINOR_COMPACTION_THRESHOLD, threshold);
+ return this;
+ }
+
+ /**
+ * Get threshold of major compaction.
+ *
+ * For those entry log files whose remaining size percentage reaches below
+ * this threshold will be compacted in a major compaction.
+ *
+ * If it is set to less than zero, the major compaction is disabled.
+ *
+ * @return threshold of major compaction
+ */
+ public double getMajorCompactionThreshold() {
+ return getDouble(MAJOR_COMPACTION_THRESHOLD, 0.8f);
+ }
+
+ /**
+ * Set threshold of major compaction.
+ *
+ * @see #getMajorCompactionThreshold()
+ *
+ * @param threshold
+ * Threshold of major compaction
+ * @return server configuration
+ */
+ public ServerConfiguration setMajorCompactionThreshold(double threshold) {
+ setProperty(MAJOR_COMPACTION_THRESHOLD, threshold);
+ return this;
+ }
+
+ /**
+ * Get interval to run minor compaction, in seconds.
+ *
+ * If it is set to less than zero, the minor compaction is disabled.
+ *
+ * @return threshold of minor compaction
+ */
+ public long getMinorCompactionInterval() {
+ return getLong(MINOR_COMPACTION_INTERVAL, 3600);
+ }
+
+ /**
+ * Set interval to run minor compaction
+ *
+ * @see #getMinorCompactionInterval()
+ *
+ * @param interval
+ * Interval to run minor compaction
+ * @return server configuration
+ */
+ public ServerConfiguration setMinorCompactionInterval(long interval) {
+ setProperty(MINOR_COMPACTION_INTERVAL, interval);
+ return this;
+ }
+
+ /**
+ * Get interval to run major compaction, in seconds.
+ *
+ * If it is set to less than zero, the major compaction is disabled.
+ *
+ * @return high water mark
+ */
+ public long getMajorCompactionInterval() {
+ return getLong(MAJOR_COMPACTION_INTERVAL, 86400);
+ }
+
+ /**
+ * Set interval to run major compaction.
+ *
+ * @see #getMajorCompactionInterval()
+ *
+ * @param interval
+ * Interval to run major compaction
+ * @return server configuration
+ */
+ public ServerConfiguration setMajorCompactionInterval(long interval) {
+ setProperty(MAJOR_COMPACTION_INTERVAL, interval);
+ return this;
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1298357&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Thu Mar 8 11:13:23 2012
@@ -0,0 +1,317 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+import java.io.File;
+import java.util.Arrays;
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BaseTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the entry log compaction functionality.
+ */
+public class CompactionTest extends BaseTestCase {
+ static Logger LOG = LoggerFactory.getLogger(CompactionTest.class);
+ DigestType digestType;
+
+ static int ENTRY_SIZE = 1024;
+
+ int numEntries;
+ int gcWaitTime;
+ double minorCompactionThreshold;
+ double majorCompactionThreshold;
+ long minorCompactionInterval;
+ long majorCompactionInterval;
+
+ String msg;
+
+ public CompactionTest(DigestType digestType) {
+ super(3);
+ this.digestType = digestType;
+
+ numEntries = 2048;
+ gcWaitTime = 1000;
+ minorCompactionThreshold = 0.1f;
+ majorCompactionThreshold = 0.5f;
+ minorCompactionInterval = 2 * gcWaitTime / 1000;
+ majorCompactionInterval = 4 * gcWaitTime / 1000;
+
+ // a dummy message
+ StringBuilder msgSB = new StringBuilder();
+ for (int i = 0; i < ENTRY_SIZE; i++) {
+ msgSB.append("a");
+ }
+ msg = msgSB.toString();
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ // Set up the configuration properties needed.
+ baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
+ baseConf.setGcWaitTime(gcWaitTime);
+ baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
+ baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
+ baseConf.setMinorCompactionInterval(minorCompactionInterval);
+ baseConf.setMajorCompactionInterval(majorCompactionInterval);
+
+ super.setUp();
+ }
+
+ LedgerHandle[] prepareData(int numEntryLogs, boolean changeNum)
+ throws Exception {
+ // since an entry log file can hold at most 2048 entries
+ // first ledger write 2 entries, which is less than low water mark
+ int num1 = 2;
+ // third ledger write more than high water mark entries
+ int num3 = (int)(numEntries * 0.7f);
+ // second ledger write remaining entries, which is higher than low water mark
+ // and less than high water mark
+ int num2 = numEntries - num3 - num1;
+
+ LedgerHandle[] lhs = new LedgerHandle[3];
+ for (int i=0; i<3; ++i) {
+ lhs[i] = bkc.createLedger(3, 3, digestType, "".getBytes());
+ }
+
+ for (int n = 0; n < numEntryLogs; n++) {
+ for (int k = 0; k < num1; k++) {
+ lhs[0].addEntry(msg.getBytes());
+ }
+ for (int k = 0; k < num2; k++) {
+ lhs[1].addEntry(msg.getBytes());
+ }
+ for (int k = 0; k < num3; k++) {
+ lhs[2].addEntry(msg.getBytes());
+ }
+ if (changeNum) {
+ --num2;
+ ++num3;
+ }
+ }
+
+ return lhs;
+ }
+
+ private void verifyLedger(long lid, long startEntryId, long endEntryId) throws Exception {
+ LedgerHandle lh = bkc.openLedger(lid, digestType, "".getBytes());
+ Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals(msg, new String(entry.getEntry()));
+ }
+ }
+
+ private boolean[] checkLogFiles(File ledgerDirectory, int numFiles) {
+ boolean[] hasLogFiles = new boolean[numFiles];
+ Arrays.fill(hasLogFiles, false);
+ for (File f : ledgerDirectory.listFiles()) {
+ LOG.info("Checking file : " + f);
+ if (f.isFile()) {
+ String name = f.getName();
+ if (!name.endsWith(".log")) {
+ continue;
+ }
+ String idString = name.split("\\.")[0];
+ int id = Integer.parseInt(idString, 16);
+ if (id >= numFiles) {
+ continue;
+ }
+ hasLogFiles[id] = true;
+ }
+ }
+ return hasLogFiles;
+ }
+
+ @Test
+ public void testDisableCompaction() throws Exception {
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ // disable compaction
+ baseConf.setMinorCompactionThreshold(0.0f);
+ baseConf.setMajorCompactionThreshold(0.0f);
+
+ // restart bookies
+ restartBookies();
+
+ // remove ledger2 and ledger3
+ // so entry log 1 and 2 would have ledger1 entries left
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+ + baseConf.getGcWaitTime());
+
+ // entry logs ([0,1].log) should not be compacted.
+ for (File ledgerDirectory : tmpDirs) {
+ boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 2);
+ assertTrue("Not Found entry log file ([0,1].log that should have been compacted in ledgerDirectory: " + ledgerDirectory, hasLogFiles[0] & hasLogFiles[1]);
+ }
+ }
+
+ @Test
+ public void testMinorCompaction() throws Exception {
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // disable major compaction
+ baseConf.setMajorCompactionThreshold(0.0f);
+
+ // restart bookies
+ restartBookies();
+
+ // remove ledger2 and ledger3
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ Thread.sleep(baseConf.getMinorCompactionInterval() * 1000
+ + baseConf.getGcWaitTime());
+
+ // entry logs ([0,1,2].log) should be compacted.
+ for (File ledgerDirectory : tmpDirs) {
+ boolean[] hasLog = checkLogFiles(ledgerDirectory, 3);
+ assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " + ledgerDirectory, hasLog[0] | hasLog[1] | hasLog[2]);
+ }
+
+ // even entry log files are removed, we still can access entries for ledger1
+ // since those entries has been compacted to new entry log
+ verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+ }
+
+ @Test
+ public void testMajorCompaction() throws Exception {
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, true);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // disable minor compaction
+ baseConf.setMinorCompactionThreshold(0.0f);
+
+ // restart bookies
+ restartBookies();
+
+ // remove ledger1 and ledger3
+ bkc.deleteLedger(lhs[0].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+
+ Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+ + baseConf.getGcWaitTime());
+
+ // entry logs ([0,1,2].log) should be compacted
+ for (File ledgerDirectory : tmpDirs) {
+ boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 3);
+ assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+ + ledgerDirectory, hasLogFiles[0] | hasLogFiles[1] | hasLogFiles[2]);
+ }
+
+ // even entry log files are removed, we still can access entries for ledger2
+ // since those entries has been compacted to new entry log
+ verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed());
+ }
+
+ @Test
+ public void testMajorCompactionAboveThreshold() throws Exception {
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // remove ledger1 and ledger2
+ bkc.deleteLedger(lhs[0].getId());
+ bkc.deleteLedger(lhs[1].getId());
+ LOG.info("Finished deleting the ledgers contains less entries.");
+ Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+ + baseConf.getGcWaitTime());
+
+ // entry logs ([0,1,2].log) should not be compacted
+ for (File ledgerDirectory : tmpDirs) {
+ boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 3);
+ assertTrue("Not Found entry log file ([1,2].log that should have been compacted in ledgerDirectory: "
+ + ledgerDirectory, hasLogFiles[0] & hasLogFiles[1] & hasLogFiles[2]);
+ }
+ }
+
+ @Test
+ public void testCompactionSmallEntryLogs() throws Exception {
+
+ // create a ledger to write a few entries
+ LedgerHandle alh = bkc.createLedger(3, 3, digestType, "".getBytes());
+ for (int i=0; i<3; i++) {
+ alh.addEntry(msg.getBytes());
+ }
+ alh.close();
+
+ // restart bookie to roll entry log files
+ restartBookies();
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // remove ledger2 and ledger3
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+ + baseConf.getGcWaitTime());
+
+ // entry logs (0.log) should not be compacted
+ // entry logs ([1,2,3].log) should be compacted.
+ for (File ledgerDirectory : tmpDirs) {
+ boolean[] hasLog = checkLogFiles(ledgerDirectory, 4);
+
+ assertTrue("Not Found entry log file ([0].log that should have been compacted in ledgerDirectory: "
+ + ledgerDirectory, hasLog[0]);
+ assertFalse("Found entry log file ([1,2,3].log that should have not been compacted in ledgerDirectory: "
+ + ledgerDirectory, hasLog[1] | hasLog[2] | hasLog[3]);
+ }
+
+ // even entry log files are removed, we still can access entries for ledger1
+ // since those entries has been compacted to new entry log
+ verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Thu Mar 8 11:13:23 2012
@@ -23,12 +23,13 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.HashMap;
import junit.framework.TestCase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.junit.After;
import org.junit.Before;
@@ -54,7 +55,7 @@ public class EntryLogTest extends TestCa
conf.setGcWaitTime(gcWaitTime);
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
// create some entries
- EntryLogger logger = new EntryLogger(conf, null);
+ EntryLogger logger = new EntryLogger(conf);
logger.addEntry(1, generateEntry(1, 1));
logger.addEntry(3, generateEntry(3, 1));
logger.addEntry(2, generateEntry(2, 1));
@@ -65,18 +66,14 @@ public class EntryLogTest extends TestCa
raf.setLength(raf.length()-10);
raf.close();
// now see which ledgers are in the log
- logger = new EntryLogger(conf, null);
- logger.start();
-
- Thread.sleep(2 * gcWaitTime);
- Field entryLogs2LedgersMapField = logger.getClass().getDeclaredField("entryLogs2LedgersMap");
- entryLogs2LedgersMapField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Map<Long, Map<Long, Boolean>> ledgersMap = (Map<Long, Map<Long, Boolean>>) entryLogs2LedgersMapField.get(logger);
- LOG.info("LedgersMap.get(0) {}", ledgersMap.get(0L));
- assertNotNull(ledgersMap.get(0L).get(1L));
- assertNull(ledgersMap.get(0L).get(2L));
- assertNotNull(ledgersMap.get(0L).get(3L));
+ logger = new EntryLogger(conf);
+ EntryLogMetadata meta =
+ logger.extractMetaFromEntryLog(0L);
+
+ LOG.info("Extracted Meta From Entry Log {}", meta);
+ assertNotNull(meta.ledgersMap.get(1L));
+ assertNull(meta.ledgersMap.get(2L));
+ assertNotNull(meta.ledgersMap.get(3L));
}
private ByteBuffer generateEntry(long ledger, long entry) {