You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/08 12:13:24 UTC

svn commit: r1298357 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookk...

Author: ivank
Date: Thu Mar  8 11:13:23 2012
New Revision: 1298357

URL: http://svn.apache.org/viewvc?rev=1298357&view=rev
Log:
BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim disk space (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar  8 11:13:23 2012
@@ -76,6 +76,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-178: Delay ledger directory creation until the ledger index file was created (sijie via ivank)
 
+        BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim disk space (sijie via ivank)
+
       hedwig-server/
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Thu Mar  8 11:13:23 2012
@@ -52,6 +52,28 @@ ledgerDirectories=/tmp/bk-data
 # A new entry log file will be created when the old one reaches the file size limitation
 # logSizeLimit=2147483648
 
+# Threshold of minor compaction
+# For those entry log files whose remaining size percentage reaches below
+# this threshold will be compacted in a minor compaction.
+# If it is set to less than zero, the minor compaction is disabled.
+# minorCompactionThreshold=0.2
+
+# Interval to run minor compaction, in seconds
+# If it is set to less than zero, the minor compaction is disabled. 
+# minorCompactionInterval=3600
+
+# Threshold of major compaction
+# For those entry log files whose remaining size percentage reaches below
+# this threshold will be compacted in a major compaction.
+# Those entry log files whose remaining size percentage is still
+# higher than the threshold will never be compacted.
+# If it is set to less than zero, the minor compaction is disabled.
+# majorCompactionThreshold=0.8
+
+# Interval to run major compaction, in seconds
+# If it is set to less than zero, the major compaction is disabled. 
+# majorCompactionInterval=86400 
+
 # Max file size of journal file, in mega bytes
 # A new journal file will be created when the old one reaches the file size limitation
 #

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Mar  8 11:13:23 2012
@@ -136,6 +136,11 @@ public class Bookie extends Thread {
 
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
+    // This is the thread that garbage collects the entry logs that do not
+    // contain any active ledgers in them; and compacts the entry logs that
+    // has lower remaining percentage to reclaim disk space.
+    final GarbageCollectorThread gcThread;
+
     /**
      * SyncThread is a background thread which flushes ledger index pages periodically.
      * Also it takes responsibility of garbage collecting journal files.
@@ -263,6 +268,28 @@ public class Bookie extends Thread {
         }
     }
 
+    /**
+     * Scanner used to do entry log compaction
+     */
+    class EntryLogCompactionScanner implements EntryLogger.EntryLogScanner {
+        @Override
+        public boolean accept(long ledgerId) {
+            // bookie has no knowledge about which ledger is deleted
+            // so just accept all ledgers.
+            return true;
+        }
+
+        @Override
+        public void process(long ledgerId, ByteBuffer buffer)
+            throws IOException {
+            try {
+                Bookie.this.addEntryByLedgerId(ledgerId, buffer);
+            } catch (BookieException be) {
+                throw new IOException(be);
+            }
+        }
+    }
+
     public Bookie(ServerConfiguration conf) 
             throws IOException, KeeperException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
@@ -283,9 +310,11 @@ public class Bookie extends Thread {
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
 
         syncThread = new SyncThread(conf);
-        entryLogger = new EntryLogger(conf, this);
+        entryLogger = new EntryLogger(conf);
         ledgerCache = new LedgerCache(conf, ledgerManager);
-
+        gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
+                                              new EntryLogCompactionScanner());
+        // replay journals
         readJournal();
     }
 
@@ -390,7 +419,7 @@ public class Bookie extends Thread {
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
         super.start();
         syncThread.start();
-        entryLogger.start();
+        gcThread.start();
         // set running here.
         // since bookie server use running as a flag to tell bookie server whether it is alive
         // if setting it in bookie thread, the watcher might run before bookie thread.
@@ -907,6 +936,9 @@ public class Bookie extends Thread {
         if (!running) { // avoid shutdown twice
             return;
         }
+        // shut down gc thread, which depends on zookeeper client
+        // also compaction will write entries again to entry log file
+        gcThread.shutdown();
         // Shutdown the ZK client
         if(zk != null) zk.close();
         this.interrupt();
@@ -957,6 +989,16 @@ public class Bookie extends Thread {
         return l;
     }
 
+    protected void addEntryByLedgerId(long ledgerId, ByteBuffer entry)
+        throws IOException, BookieException {
+        LedgerDescriptor handle = getHandle(ledgerId);
+        try {
+            handle.addEntry(entry);
+        } finally {
+            putHandle(handle);
+        }
+    }
+
     /**
      * Add an entry to a ledger as specified by handle. 
      */

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Thu Mar  8 11:13:23 2012
@@ -36,13 +36,13 @@ import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
 
 /**
  * This class manages the writing of the bookkeeper entries. All the new
@@ -54,9 +54,6 @@ import org.apache.bookkeeper.meta.Ledger
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
     private File dirs[];
-    // This is a handle to the Bookie parent instance. We need this to get
-    // access to the LedgerCache as well as the ZooKeeper client handle.
-    private final Bookie bookie;
 
     private long logId;
     /**
@@ -74,24 +71,102 @@ public class EntryLogger {
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
-    // Maps entry log files to the set of ledgers that comprise the file.
-    private ConcurrentMap<Long, ConcurrentHashMap<Long, Boolean>> entryLogs2LedgersMap = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>>();
-    // This is the thread that garbage collects the entry logs that do not
-    // contain any active ledgers in them.
-    GarbageCollectorThread gcThread = new GarbageCollectorThread();
-    // This is how often we want to run the Garbage Collector Thread (in milliseconds).
-    final long gcWaitTime;
+    final static long MB = 1024 * 1024;
+
+    /**
+     * Records the total size, remaining size and the set of ledgers that comprise a entry log.
+     */
+    static class EntryLogMetadata {
+        long entryLogId;
+        long totalSize;
+        long remainingSize;
+        ConcurrentHashMap<Long, Long> ledgersMap;
+
+        public EntryLogMetadata(long logId) {
+            this.entryLogId = logId;
+
+            totalSize = remainingSize = 0;
+            ledgersMap = new ConcurrentHashMap<Long, Long>();
+        }
+
+        public void addLedgerSize(long ledgerId, long size) {
+            totalSize += size;
+            remainingSize += size;
+            Long ledgerSize = ledgersMap.get(ledgerId);
+            if (null == ledgerSize) {
+                ledgerSize = 0L;
+            }
+            ledgerSize += size;
+            ledgersMap.put(ledgerId, ledgerSize);
+        }
+
+        public void removeLedger(long ledgerId) {
+            Long size = ledgersMap.remove(ledgerId);
+            if (null == size) {
+                return;
+            }
+            remainingSize -= size;
+        }
+
+        public boolean containsLedger(long ledgerId) {
+            return ledgersMap.containsKey(ledgerId);
+        }
+
+        public double getUsage() {
+            if (totalSize == 0L) {
+                return 0.0f;
+            }
+            return (double)remainingSize / totalSize;
+        }
+
+        public boolean isEmpty() {
+            return ledgersMap.isEmpty();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ")
+              .append(remainingSize).append(", ledgersMap = ").append(ledgersMap).append(" }");
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Scan entries in a entry log file.
+     */
+    static interface EntryLogScanner {
+        /**
+         * Tests whether or not the entries belongs to the specified ledger
+         * should be processed.
+         *
+         * @param ledgerId
+         *          Ledger ID.
+         * @return true if and only the entries of the ledger should be scanned.
+         */
+        public boolean accept(long ledgerId);
+
+        /**
+         * Process an entry.
+         *
+         * @param ledgerId
+         *          Ledger ID.
+         * @param entry
+         *          Entry ByteBuffer
+         * @throws IOException
+         */
+        public void process(long ledgerId, ByteBuffer entry) throws IOException;
+    }
 
     /**
      * Create an EntryLogger that stores it's log files in the given
      * directories
      */
-    public EntryLogger(ServerConfiguration conf, Bookie bookie) throws IOException {
+    public EntryLogger(ServerConfiguration conf) throws IOException {
         this.dirs = conf.getLedgerDirs();
-        this.bookie = bookie;
         // log size limit
         this.logSizeLimit = conf.getEntryLogSizeLimit();
-        this.gcWaitTime = conf.getGcWaitTime();
+
         // Initialize the entry log header buffer. This cannot be a static object
         // since in our unit tests, we run multiple Bookies and thus EntryLoggers
         // within the same JVM. All of these Bookie instances access this header
@@ -108,119 +183,12 @@ public class EntryLogger {
         createLogId(logId);
     }
 
-    public void start() {
-        // Start the Garbage Collector thread to prune unneeded entry logs.
-        gcThread.start();
-    }
-
     /**
      * Maps entry log files to open channels.
      */
     private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
 
     /**
-     * This is the garbage collector thread that runs in the background to
-     * remove any entry log files that no longer contains any active ledger.
-     */
-    class GarbageCollectorThread extends Thread {
-        volatile boolean running = true;
-
-        public GarbageCollectorThread() {
-            super("GarbageCollectorThread");
-        }
-
-        @Override
-        public void run() {
-            while (running) {
-                synchronized (this) {
-                    try {
-                        wait(gcWaitTime);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        continue;
-                    }
-                }
-                // Extract all of the ledger ID's that comprise all of the entry logs
-                // (except for the current new one which is still being written to).
-                try {
-                    extractLedgersFromEntryLogs();
-                } catch (IOException ie) {
-                    LOG.warn("Exception when extracting ledgers from entry logs : ", ie);
-                }
-
-                // Initialization check. No need to run any logic if we are still starting up.
-                if (bookie == null ||
-                    bookie.zk == null || bookie.ledgerCache == null) {
-                    continue;
-                }
-
-                // gc inactive/deleted ledgers
-                doGcLedgers();
-
-                // gc entry logs
-                doGcEntryLogs();
-            }
-        }
-
-        /**
-         * Do garbage collection ledger index files
-         */
-        private void doGcLedgers() {
-            bookie.ledgerCache.activeLedgerManager.garbageCollectLedgers(
-            new LedgerManager.GarbageCollector() {
-                @Override
-                public void gc(long ledgerId) {
-                    try {
-                        bookie.ledgerCache.deleteLedger(ledgerId);
-                    } catch (IOException e) {
-                        LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
-                    }
-                }
-            });
-        }
-
-        /**
-         * Garbage collect those entry loggers which are not associated with any active ledgers
-         */
-        private void doGcEntryLogs() {
-            // Loop through all of the entry logs and remove the non-active ledgers.
-            for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
-                ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
-                for (Long entryLogLedger : entryLogLedgers.keySet()) {
-                    // Remove the entry log ledger from the set if it isn't active.
-                    if (!bookie.ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
-                        entryLogLedgers.remove(entryLogLedger);
-                    }
-                }
-                if (entryLogLedgers.isEmpty()) {
-                    // This means the entry log is not associated with any active ledgers anymore.
-                    // We can remove this entry log file now.
-                    LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
-                    BufferedChannel bc = channels.remove(entryLogId);
-                    if (null != bc) {
-                        // close its underlying file channel, so it could be deleted really
-                        try {
-                            bc.getFileChannel().close();
-                        } catch (IOException ie) {
-                            LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
-                        }
-                    }
-                    File entryLogFile;
-                    try {
-                        entryLogFile = findFile(entryLogId);
-                    } catch (FileNotFoundException e) {
-                        LOG.error("Trying to delete an entryLog file that could not be found: "
-                                + entryLogId + ".log");
-                        continue;
-                    }
-                    entryLogFile.delete();
-                    entryLogs2LedgersMap.remove(entryLogId);
-                }
-            }
-        }
-    }
-
-    /**
      * Creates a new log file with the given id.
      */
     private void createLogId(long logId) throws IOException {
@@ -239,6 +207,34 @@ public class EntryLogger {
     }
 
     /**
+     * Remove entry log.
+     *
+     * @param entryLogId
+     *          Entry Log File Id
+     */
+    protected boolean removeEntryLog(long entryLogId) {
+        BufferedChannel bc = channels.remove(entryLogId);
+        if (null != bc) {
+            // close its underlying file channel, so it could be deleted really
+            try {
+                bc.getFileChannel().close();
+            } catch (IOException ie) {
+                LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
+            }
+        }
+        File entryLogFile;
+        try {
+            entryLogFile = findFile(entryLogId);
+        } catch (FileNotFoundException e) {
+            LOG.error("Trying to delete an entryLog file that could not be found: "
+                    + entryLogId + ".log");
+            return false;
+        }
+        entryLogFile.delete();
+        return true;
+    }
+
+    /**
      * writes the given id to the "lastId" file in the given directory.
      */
     private void setLastLogId(File dir, long logId) throws IOException {
@@ -326,7 +322,7 @@ public class EntryLogger {
         sizeBuff.flip();
         int entrySize = sizeBuff.getInt();
         // entrySize does not include the ledgerId
-        if (entrySize > 1024*1024) {
+        if (entrySize > MB) {
             LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
 
         }
@@ -390,84 +386,147 @@ public class EntryLogger {
     }
 
     /**
+     * A scanner used to extract entry log meta from entry log files.
+     */
+    class ExtractionScanner implements EntryLogScanner {
+        EntryLogMetadata meta;
+
+        public ExtractionScanner(EntryLogMetadata meta) {
+            this.meta = meta;
+        }
+
+        @Override
+        public boolean accept(long ledgerId) {
+            return true;
+        }
+        @Override
+        public void process(long ledgerId, ByteBuffer entry) {
+            // add new entry size of a ledger to entry log meta
+            meta.addLedgerSize(ledgerId, entry.limit() + 4);
+        }
+    }
+
+    /**
      * Method to read in all of the entry logs (those that we haven't done so yet),
      * and find the set of ledger ID's that make up each entry log file.
+     *
+     * @param entryLogMetaMap
+     *          Existing EntryLogs to Meta
+     * @throws IOException
      */
-    private void extractLedgersFromEntryLogs() throws IOException {
+    protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) throws IOException {
         // Extract it for every entry log except for the current one.
         // Entry Log ID's are just a long value that starts at 0 and increments
         // by 1 when the log fills up and we roll to a new one.
-        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
-        BufferedChannel bc;
         long curLogId = logId;
         for (long entryLogId = 0; entryLogId < curLogId; entryLogId++) {
             // Comb the current entry log file if it has not already been extracted.
-            if (entryLogs2LedgersMap.containsKey(entryLogId)) {
+            if (entryLogMetaMap.containsKey(entryLogId)) {
                 continue;
             }
-            LOG.info("Extracting the ledgers from entryLogId: " + entryLogId);
-            // Get the BufferedChannel for the current entry log file
+            LOG.info("Extracting entry log meta from entryLogId: " + entryLogId);
+            EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
+            ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
+            // Read through the entry log file and extract the entry log meta
             try {
-                bc = getChannelForLogId(entryLogId);
-            } catch (FileNotFoundException e) {
-                // If we can't find the entry log file, just log a warning message and continue.
-                // This could be a deleted/garbage collected entry log.
-                LOG.warn("Entry Log file not found in log directories: " + entryLogId + ".log");
-                continue;
-            }
-            // Start the read position in the current entry log file to be after
-            // the header where all of the ledger entries are.
-            long pos = LOGFILE_HEADER_SIZE;
-            ConcurrentHashMap<Long, Boolean> entryLogLedgers = new ConcurrentHashMap<Long, Boolean>();
-            // Read through the entry log file and extract the ledger ID's.
-            try {
-                while (true) {
-                    // Check if we've finished reading the entry log file.
-                    if (pos >= bc.size()) {
-                        break;
-                    }
-                    if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
-                        throw new IOException("Short read from entrylog " + entryLogId);
-                    }
-                    pos += 4;
-                    sizeBuff.flip();
-                    int entrySize = sizeBuff.getInt();
-                    if (entrySize > 1024 * 1024) {
-                        LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
-                                + entryLogId);
-                    }
-                    byte data[] = new byte[entrySize];
-                    ByteBuffer buff = ByteBuffer.wrap(data);
-                    int rc = bc.read(buff, pos);
-                    if (rc != data.length) {
-                        throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!="
-                                + data.length + ")");
-                    }
-                    buff.flip();
-                    long ledgerId = buff.getLong();
-                    entryLogLedgers.put(ledgerId, true);
-                    // Advance position to the next entry and clear sizeBuff.
-                    pos += entrySize;
-                    sizeBuff.clear();
-                }
+                scanEntryLog(entryLogId, scanner);
+                LOG.info("Retrieved entry log meta data entryLogId: " + entryLogId + ", meta: " + entryLogMeta);
+                entryLogMetaMap.put(entryLogId, entryLogMeta);
             } catch(IOException e) {
-              LOG.info("Premature exception when processing " + entryLogId + 
+              LOG.warn("Premature exception when processing " + entryLogId +
                        "recovery will take care of the problem", e);
             }
-            LOG.info("Retrieved all ledgers that comprise entryLogId: " + entryLogId + ", values: " + entryLogLedgers);
-            entryLogs2LedgersMap.put(entryLogId, entryLogLedgers);
+
         }
+        return entryLogMetaMap;
+    }
+
+    protected EntryLogMetadata extractMetaFromEntryLog(long entryLogId) {
+        EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
+        ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
+        // Read through the entry log file and extract the entry log meta
+        try {
+            scanEntryLog(entryLogId, scanner);
+            LOG.info("Retrieved entry log meta data entryLogId: " + entryLogId + ", meta: " + entryLogMeta);
+        } catch(IOException e) {
+          LOG.warn("Premature exception when processing " + entryLogId +
+                   "recovery will take care of the problem", e);
+        }
+        return entryLogMeta;
     }
 
     /**
-     * Shutdown method to gracefully stop all threads spawned in this class and exit.
+     * Scan entry log
      *
-     * @throws InterruptedException if there is an exception stopping threads.
+     * @param entryLogId
+     *          Entry Log Id
+     * @param scanner
+     *          Entry Log Scanner
+     * @throws IOException
+     */
+    protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
+        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+        ByteBuffer lidBuff = ByteBuffer.allocate(8);
+        BufferedChannel bc;
+        // Get the BufferedChannel for the current entry log file
+        try {
+            bc = getChannelForLogId(entryLogId);
+        } catch (IOException e) {
+            LOG.warn("Failed to get channel to scan entry log: " + entryLogId + ".log");
+            throw e;
+        }
+        // Start the read position in the current entry log file to be after
+        // the header where all of the ledger entries are.
+        long pos = LOGFILE_HEADER_SIZE;
+        // Read through the entry log file and extract the ledger ID's.
+        while (true) {
+            // Check if we've finished reading the entry log file.
+            if (pos >= bc.size()) {
+                break;
+            }
+            if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+                throw new IOException("Short read for entry size from entrylog " + entryLogId);
+            }
+            pos += 4;
+            sizeBuff.flip();
+            int entrySize = sizeBuff.getInt();
+            if (entrySize > MB) {
+                LOG.warn("Found large size entry of " + entrySize + " at location " + pos + " in "
+                        + entryLogId);
+            }
+            sizeBuff.clear();
+            // try to read ledger id first
+            if (bc.read(lidBuff, pos) != lidBuff.capacity()) {
+                throw new IOException("Short read for ledger id from entrylog " + entryLogId);
+            }
+            lidBuff.flip();
+            long lid = lidBuff.getLong();
+            lidBuff.clear();
+            if (!scanner.accept(lid)) {
+                // skip this entry
+                pos += entrySize;
+                continue;
+            }
+            // read the entry
+            byte data[] = new byte[entrySize];
+            ByteBuffer buff = ByteBuffer.wrap(data);
+            int rc = bc.read(buff, pos);
+            if (rc != data.length) {
+                throw new IOException("Short read for ledger entry from entryLog " + entryLogId
+                                    + "@" + pos + "(" + rc + "!=" + data.length + ")");
+            }
+            buff.flip();
+            // process the entry
+            scanner.process(lid, buff);
+            // Advance position to the next entry
+            pos += entrySize;
+        }
+    }
+
+    /**
+     * Shutdown method to gracefully stop entry logger.
      */
-    public void shutdown() throws InterruptedException {
-        gcThread.running = false;
-        gcThread.interrupt();
-        gcThread.join();
+    public void shutdown() {
         // since logChannel is buffered channel, do flush when shutting down
         try {
             flush();

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1298357&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Thu Mar  8 11:13:23 2012
@@ -0,0 +1,378 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogMetadata;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is the garbage collector thread that runs in the background to
+ * remove any entry log files that no longer contains any active ledger.
+ */
+public class GarbageCollectorThread extends Thread {
+    private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
+
+    private static final int SECOND = 1000;
+
+    // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
+    private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long, EntryLogMetadata>();
+
+    // This is how often we want to run the Garbage Collector Thread (in milliseconds).
+    final long gcWaitTime;
+
+    // Compaction parameters
+    boolean enableMinorCompaction = false;
+    final double minorCompactionThreshold;
+    final long minorCompactionInterval;
+
+    boolean enableMajorCompaction = false;
+    final double majorCompactionThreshold;
+    final long majorCompactionInterval;
+
+    long lastMinorCompactionTime;
+    long lastMajorCompactionTime;
+
+    // Entry Logger Handle
+    final EntryLogger entryLogger;
+    final EntryLogScanner scanner;
+
+    // Ledger Cache Handle
+    final LedgerCache ledgerCache;
+
+    // ZooKeeper Client
+    final ZooKeeper zk;
+
+    // flag to ensure gc thread will not be interrupted during compaction
+    // to reduce the risk getting entry log corrupted
+    final AtomicBoolean compacting = new AtomicBoolean(false);
+
+    volatile boolean running = true;
+
+    /**
+     * A scanner wrapper to check whether a ledger is alive in an entry log file
+     */
+    class CompactionScanner implements EntryLogScanner {
+        EntryLogMetadata meta;
+
+        public CompactionScanner(EntryLogMetadata meta) {
+            this.meta = meta;
+        }
+
+        @Override
+        public boolean accept(long ledgerId) {
+            return meta.containsLedger(ledgerId) && scanner.accept(ledgerId);
+        }
+
+        @Override
+        public void process(long ledgerId, ByteBuffer entry) throws IOException {
+            scanner.process(ledgerId, entry);
+        }
+    }
+
+
+    /**
+     * Create a garbage collector thread.
+     *
+     * @param conf
+     *          Server Configuration Object.
+     * @throws IOException
+     */
+    public GarbageCollectorThread(ServerConfiguration conf,
+                                  ZooKeeper zookeeper,
+                                  LedgerCache ledgerCache,
+                                  EntryLogger entryLogger,
+                                  EntryLogScanner scanner)
+        throws IOException {
+        super("GarbageCollectorThread");
+
+        this.zk = zookeeper;
+        this.ledgerCache = ledgerCache;
+        this.entryLogger = entryLogger;
+        this.scanner = scanner;
+
+        this.gcWaitTime = conf.getGcWaitTime();
+        // compaction parameters
+        minorCompactionThreshold = conf.getMinorCompactionThreshold();
+        minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
+        majorCompactionThreshold = conf.getMajorCompactionThreshold();
+        majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
+
+        if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
+            if (minorCompactionThreshold > 1.0f) {
+                throw new IOException("Invalid minor compaction threshold "
+                                    + minorCompactionThreshold);
+            }
+            if (minorCompactionInterval <= gcWaitTime) {
+                throw new IOException("Too short minor compaction interval : "
+                                    + minorCompactionInterval);
+            }
+            enableMinorCompaction = true;
+        }
+
+        if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
+            if (majorCompactionThreshold > 1.0f) {
+                throw new IOException("Invalid major compaction threshold "
+                                    + majorCompactionThreshold);
+            }
+            if (majorCompactionInterval <= gcWaitTime) {
+                throw new IOException("Too short major compaction interval : "
+                                    + majorCompactionInterval);
+            }
+            enableMajorCompaction = true;
+        }
+
+        if (enableMinorCompaction && enableMajorCompaction) {
+            if (minorCompactionInterval >= majorCompactionInterval ||
+                minorCompactionThreshold >= majorCompactionThreshold) {
+                throw new IOException("Invalid minor/major compaction settings : minor ("
+                                    + minorCompactionThreshold + ", " + minorCompactionInterval
+                                    + "), major (" + majorCompactionThreshold + ", "
+                                    + majorCompactionInterval + ")");
+            }
+        }
+
+        LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
+               + minorCompactionThreshold + ", interval=" + minorCompactionInterval);
+        LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
+               + majorCompactionThreshold + ", interval=" + majorCompactionInterval);
+
+        lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void run() {
+        while (running) {
+            synchronized (this) {
+                try {
+                    wait(gcWaitTime);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    continue;
+                }
+            }
+
+            // Dependency check.
+            if (null == zk) {
+                continue;
+            }
+
+            // Extract all of the ledger ID's that comprise all of the entry logs
+            // (except for the current new one which is still being written to).
+            try {
+                entryLogMetaMap = entryLogger.extractMetaFromEntryLogs(entryLogMetaMap);
+            } catch (IOException ie) {
+                LOG.warn("Exception when extracting entry log meta from entry logs : ", ie);
+            }
+
+            // gc inactive/deleted ledgers
+            doGcLedgers();
+
+            // gc entry logs
+            doGcEntryLogs();
+
+            long curTime = System.currentTimeMillis();
+            if (enableMajorCompaction &&
+                curTime - lastMajorCompactionTime > majorCompactionInterval) {
+                // enter major compaction
+                LOG.info("Enter major compaction");
+                doCompactEntryLogs(majorCompactionThreshold);
+                lastMajorCompactionTime = System.currentTimeMillis();
+                // also move minor compaction time
+                lastMinorCompactionTime = lastMajorCompactionTime;
+                continue;
+            }
+
+            if (enableMinorCompaction &&
+                curTime - lastMinorCompactionTime > minorCompactionInterval) {
+                // enter minor compaction
+                LOG.info("Enter minor compaction");
+                doCompactEntryLogs(minorCompactionThreshold);
+                lastMinorCompactionTime = System.currentTimeMillis();
+            }
+        }
+    }
+
+    /**
+     * Do garbage collection ledger index files
+     */
+    private void doGcLedgers() {
+        ledgerCache.activeLedgerManager.garbageCollectLedgers(
+        new LedgerManager.GarbageCollector() {
+            @Override
+            public void gc(long ledgerId) {
+                try {
+                    ledgerCache.deleteLedger(ledgerId);
+                } catch (IOException e) {
+                    LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+                }
+            }
+        });
+    }
+
+    /**
+     * Garbage collect those entry loggers which are not associated with any active ledgers
+     */
+    private void doGcEntryLogs() {
+        // Loop through all of the entry logs and remove the non-active ledgers.
+        for (Long entryLogId : entryLogMetaMap.keySet()) {
+            EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
+            for (Long entryLogLedger : meta.ledgersMap.keySet()) {
+                // Remove the entry log ledger from the set if it isn't active.
+                if (!ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+                    meta.removeLedger(entryLogLedger);
+                }
+            }
+            if (meta.isEmpty()) {
+                // This means the entry log is not associated with any active ledgers anymore.
+                // We can remove this entry log file now.
+                LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
+                removeEntryLog(entryLogId);
+            }
+        }
+    }
+
+    /**
+     * Compact entry logs if necessary.
+     *
+     * <p>
+     * Compaction will be executed from low unused space to high unused space.
+     * Those entry log files whose remaining size percentage is higher than threshold
+     * would not be compacted.
+     * </p>
+     */
+    private void doCompactEntryLogs(double threshold) {
+        LOG.info("Do compaction to compact those files lower than " + threshold);
+        // sort the ledger meta by occupied unused space
+        Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>() {
+            @Override
+            public int compare(EntryLogMetadata m1, EntryLogMetadata m2) {
+                long unusedSize1 = m1.totalSize - m1.remainingSize;
+                long unusedSize2 = m2.totalSize - m2.remainingSize;
+                if (unusedSize1 > unusedSize2) {
+                    return -1;
+                } else if (unusedSize1 < unusedSize2) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        };
+        List<EntryLogMetadata> logsToCompact = new ArrayList();
+        logsToCompact.addAll(entryLogMetaMap.values());
+        Collections.sort(logsToCompact, sizeComparator);
+        for (EntryLogMetadata meta : logsToCompact) {
+            if (meta.getUsage() >= threshold) {
+                break;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Compacting entry log " + meta.entryLogId + " below threshold "
+                        + threshold + ".");
+            }
+            compactEntryLog(meta.entryLogId);
+            if (!running) { // if gc thread is not running, stop compaction
+                return;
+            }
+        }
+    }
+
+    /**
+     * Shutdown the garbage collector thread.
+     *
+     * @throws InterruptedException if there is an exception stopping gc thread.
+     */
+    public void shutdown() throws InterruptedException {
+        this.running = false;
+        if (compacting.compareAndSet(false, true)) {
+            // if setting compacting flag succeed, means gcThread is not compacting now
+            // it is safe to interrupt itself now
+            this.interrupt();
+        }
+        this.join();
+    }
+
+    /**
+     * Remove entry log.
+     *
+     * @param entryLogId
+     *          Entry Log File Id
+     */
+    private void removeEntryLog(long entryLogId) {
+        // remove entry log file successfully
+        if (entryLogger.removeEntryLog(entryLogId)) {
+            entryLogMetaMap.remove(entryLogId);
+        }
+    }
+
+    /**
+     * Compact an entry log.
+     *
+     * @param entryLogId
+     *          Entry Log File Id
+     */
+    protected void compactEntryLog(long entryLogId) {
+        EntryLogMetadata entryLogMeta = entryLogMetaMap.get(entryLogId);
+        if (null == entryLogMeta) {
+            LOG.warn("Can't get entry log meta when compacting entry log " + entryLogId + ".");
+            return;
+        }
+
+        // Similar with Sync Thread
+        // try to mark compacting flag to make sure it would not be interrupted
+        // by shutdown during compaction. otherwise it will receive
+        // ClosedByInterruptException which may cause index file & entry logger
+        // closed and corrupted.
+        if (!compacting.compareAndSet(false, true)) {
+            // set compacting flag failed, means compacting is true now
+            // indicates another thread wants to interrupt gc thread to exit
+            return;
+        }
+
+        LOG.info("Compacting entry log : " + entryLogId);
+
+        try {
+            entryLogger.scanEntryLog(entryLogId, new CompactionScanner(entryLogMeta));
+            // after moving entries to new entry log, remove this old one
+            removeEntryLog(entryLogId);
+        } catch (IOException e) {
+            LOG.info("Premature exception when compacting " + entryLogId, e);
+        } finally {
+            // clear compacting flag
+            compacting.set(false);
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Thu Mar  8 11:13:23 2012
@@ -25,6 +25,10 @@ import java.io.File;
 public class ServerConfiguration extends AbstractConfiguration {
     // Entry Log Parameters
     protected final static String ENTRY_LOG_SIZE_LIMIT = "logSizeLimit";
+    protected final static String MINOR_COMPACTION_INTERVAL = "minorCompactionInterval";
+    protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
+    protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
+    protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
 
     // Gc Parameters
     protected final static String GC_WAIT_TIME = "gcWaitTime";
@@ -405,4 +409,110 @@ public class ServerConfiguration extends
         setProperty(ENABLE_STATISTICS, Boolean.toString(enabled));
         return this;
     }
+
+    /**
+     * Get threshold of minor compaction.
+     *
+     * For those entry log files whose remaining size percentage reaches below
+     * this threshold  will be compacted in a minor compaction.
+     *
+     * If it is set to less than zero, the minor compaction is disabled.
+     *
+     * @return threshold of minor compaction
+     */
+    public double getMinorCompactionThreshold() {
+        return getDouble(MINOR_COMPACTION_THRESHOLD, 0.2f);
+    }
+
+    /**
+     * Set threshold of minor compaction
+     *
+     * @see #getMinorCompactionThreshold()
+     *
+     * @param threshold
+     *          Threshold for minor compaction
+     * @return server configuration
+     */
+    public ServerConfiguration setMinorCompactionThreshold(double threshold) {
+        setProperty(MINOR_COMPACTION_THRESHOLD, threshold);
+        return this;
+    }
+
+    /**
+     * Get threshold of major compaction.
+     *
+     * For those entry log files whose remaining size percentage reaches below
+     * this threshold  will be compacted in a major compaction.
+     *
+     * If it is set to less than zero, the major compaction is disabled.
+     *
+     * @return threshold of major compaction
+     */
+    public double getMajorCompactionThreshold() {
+        return getDouble(MAJOR_COMPACTION_THRESHOLD, 0.8f);
+    }
+
+    /**
+     * Set threshold of major compaction.
+     *
+     * @see #getMajorCompactionThreshold()
+     *
+     * @param threshold
+     *          Threshold of major compaction
+     * @return server configuration
+     */
+    public ServerConfiguration setMajorCompactionThreshold(double threshold) {
+        setProperty(MAJOR_COMPACTION_THRESHOLD, threshold);
+        return this;
+    }
+
+    /**
+     * Get interval to run minor compaction, in seconds.
+     *
+     * If it is set to less than zero, the minor compaction is disabled.
+     *
+     * @return threshold of minor compaction
+     */
+    public long getMinorCompactionInterval() {
+        return getLong(MINOR_COMPACTION_INTERVAL, 3600);
+    }
+
+    /**
+     * Set interval to run minor compaction
+     *
+     * @see #getMinorCompactionInterval()
+     *
+     * @param interval
+     *          Interval to run minor compaction
+     * @return server configuration
+     */
+    public ServerConfiguration setMinorCompactionInterval(long interval) {
+        setProperty(MINOR_COMPACTION_INTERVAL, interval);
+        return this;
+    }
+
+    /**
+     * Get interval to run major compaction, in seconds.
+     *
+     * If it is set to less than zero, the major compaction is disabled.
+     *
+     * @return high water mark
+     */
+    public long getMajorCompactionInterval() {
+        return getLong(MAJOR_COMPACTION_INTERVAL, 86400);
+    }
+
+    /**
+     * Set interval to run major compaction.
+     *
+     * @see #getMajorCompactionInterval()
+     *
+     * @param interval
+     *          Interval to run major compaction
+     * @return server configuration
+     */
+    public ServerConfiguration setMajorCompactionInterval(long interval) {
+        setProperty(MAJOR_COMPACTION_INTERVAL, interval);
+        return this;
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1298357&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Thu Mar  8 11:13:23 2012
@@ -0,0 +1,317 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+import java.io.File;
+import java.util.Arrays;
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BaseTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the entry log compaction functionality.
+ */
+public class CompactionTest extends BaseTestCase {
+    static Logger LOG = LoggerFactory.getLogger(CompactionTest.class);
+    DigestType digestType;
+
+    static int ENTRY_SIZE = 1024;
+
+    int numEntries;
+    int gcWaitTime;
+    double minorCompactionThreshold;
+    double majorCompactionThreshold;
+    long minorCompactionInterval;
+    long majorCompactionInterval;
+
+    String msg;
+
+    public CompactionTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
+
+        numEntries = 2048;
+        gcWaitTime = 1000;
+        minorCompactionThreshold = 0.1f;
+        majorCompactionThreshold = 0.5f;
+        minorCompactionInterval = 2 * gcWaitTime / 1000;
+        majorCompactionInterval = 4 * gcWaitTime / 1000;
+
+        // a dummy message
+        StringBuilder msgSB = new StringBuilder();
+        for (int i = 0; i < ENTRY_SIZE; i++) {
+            msgSB.append("a");
+        }
+        msg = msgSB.toString();
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        // Set up the configuration properties needed.
+        baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
+        baseConf.setGcWaitTime(gcWaitTime);
+        baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
+        baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
+        baseConf.setMinorCompactionInterval(minorCompactionInterval);
+        baseConf.setMajorCompactionInterval(majorCompactionInterval);
+
+        super.setUp();
+    }
+
+    LedgerHandle[] prepareData(int numEntryLogs, boolean changeNum)
+        throws Exception {
+        // since an entry log file can hold at most 2048 entries
+        // first ledger write 2 entries, which is less than low water mark
+        int num1 = 2;
+        // third ledger write more than high water mark entries
+        int num3 = (int)(numEntries * 0.7f);
+        // second ledger write remaining entries, which is higher than low water mark
+        // and less than high water mark
+        int num2 = numEntries - num3 - num1;
+
+        LedgerHandle[] lhs = new LedgerHandle[3];
+        for (int i=0; i<3; ++i) {
+            lhs[i] = bkc.createLedger(3, 3, digestType, "".getBytes());
+        }
+
+        for (int n = 0; n < numEntryLogs; n++) {
+            for (int k = 0; k < num1; k++) {
+                lhs[0].addEntry(msg.getBytes());
+            }
+            for (int k = 0; k < num2; k++) {
+                lhs[1].addEntry(msg.getBytes());
+            }
+            for (int k = 0; k < num3; k++) {
+                lhs[2].addEntry(msg.getBytes());
+            }
+            if (changeNum) {
+                --num2;
+                ++num3;
+            }
+        }
+
+        return lhs;
+    }
+
+    private void verifyLedger(long lid, long startEntryId, long endEntryId) throws Exception {
+        LedgerHandle lh = bkc.openLedger(lid, digestType, "".getBytes());
+        Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            assertEquals(msg, new String(entry.getEntry()));
+        }
+    }
+
+    private boolean[] checkLogFiles(File ledgerDirectory, int numFiles) {
+        boolean[] hasLogFiles = new boolean[numFiles];
+        Arrays.fill(hasLogFiles, false);
+        for (File f : ledgerDirectory.listFiles()) {
+            LOG.info("Checking file : " + f);
+            if (f.isFile()) {
+                String name = f.getName();
+                if (!name.endsWith(".log")) {
+                    continue;
+                }
+                String idString = name.split("\\.")[0];
+                int id = Integer.parseInt(idString, 16);
+                if (id >= numFiles) {
+                    continue;
+                }
+                hasLogFiles[id] = true; 
+            }
+        }
+        return hasLogFiles;
+    }
+
+    @Test
+    public void testDisableCompaction() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        // disable compaction
+        baseConf.setMinorCompactionThreshold(0.0f);
+        baseConf.setMajorCompactionThreshold(0.0f);
+
+        // restart bookies
+        restartBookies();
+
+        // remove ledger2 and ledger3
+        // so entry log 1 and 2 would have ledger1 entries left
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+                   + baseConf.getGcWaitTime());
+
+        // entry logs ([0,1].log) should not be compacted.
+        for (File ledgerDirectory : tmpDirs) {
+            boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 2);
+            assertTrue("Not Found entry log file ([0,1].log that should have been compacted in ledgerDirectory: " + ledgerDirectory, hasLogFiles[0] & hasLogFiles[1]);
+        }
+    }
+
+    @Test
+    public void testMinorCompaction() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // disable major compaction
+        baseConf.setMajorCompactionThreshold(0.0f);
+
+        // restart bookies
+        restartBookies();
+
+        // remove ledger2 and ledger3
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        Thread.sleep(baseConf.getMinorCompactionInterval() * 1000
+                   + baseConf.getGcWaitTime());
+
+        // entry logs ([0,1,2].log) should be compacted.
+        for (File ledgerDirectory : tmpDirs) {
+            boolean[] hasLog = checkLogFiles(ledgerDirectory, 3); 
+            assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " + ledgerDirectory, hasLog[0] | hasLog[1] | hasLog[2]);
+        }
+
+        // even entry log files are removed, we still can access entries for ledger1
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+    }
+
+    @Test
+    public void testMajorCompaction() throws Exception {
+
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, true);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // disable minor compaction
+        baseConf.setMinorCompactionThreshold(0.0f);
+
+        // restart bookies
+        restartBookies();
+
+        // remove ledger1 and ledger3
+        bkc.deleteLedger(lhs[0].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+                   + baseConf.getGcWaitTime());
+
+        // entry logs ([0,1,2].log) should be compacted
+        for (File ledgerDirectory : tmpDirs) {
+            boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 3); 
+            assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+                      + ledgerDirectory, hasLogFiles[0] | hasLogFiles[1] | hasLogFiles[2]);
+        }
+
+        // even entry log files are removed, we still can access entries for ledger2
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed());
+    }
+
+    @Test
+    public void testMajorCompactionAboveThreshold() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // remove ledger1 and ledger2
+        bkc.deleteLedger(lhs[0].getId());
+        bkc.deleteLedger(lhs[1].getId());
+        LOG.info("Finished deleting the ledgers contains less entries.");
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+                   + baseConf.getGcWaitTime());
+
+        // entry logs ([0,1,2].log) should not be compacted
+        for (File ledgerDirectory : tmpDirs) {
+            boolean[] hasLogFiles = checkLogFiles(ledgerDirectory, 3); 
+            assertTrue("Not Found entry log file ([1,2].log that should have been compacted in ledgerDirectory: "
+                     + ledgerDirectory, hasLogFiles[0] & hasLogFiles[1] & hasLogFiles[2]);
+        }
+    }
+
+    @Test
+    public void testCompactionSmallEntryLogs() throws Exception {
+
+        // create a ledger to write a few entries
+        LedgerHandle alh = bkc.createLedger(3, 3, digestType, "".getBytes());
+        for (int i=0; i<3; i++) {
+           alh.addEntry(msg.getBytes());
+        }
+        alh.close();
+
+        // restart bookie to roll entry log files
+        restartBookies();
+
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // remove ledger2 and ledger3
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+                   + baseConf.getGcWaitTime());
+
+        // entry logs (0.log) should not be compacted
+        // entry logs ([1,2,3].log) should be compacted.
+        for (File ledgerDirectory : tmpDirs) {
+            boolean[] hasLog = checkLogFiles(ledgerDirectory, 4);
+            
+            assertTrue("Not Found entry log file ([0].log that should have been compacted in ledgerDirectory: "
+                     + ledgerDirectory, hasLog[0]);
+            assertFalse("Found entry log file ([1,2,3].log that should have not been compacted in ledgerDirectory: "
+                      + ledgerDirectory, hasLog[1] | hasLog[2] | hasLog[3]);
+        }
+
+        // even entry log files are removed, we still can access entries for ledger1
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1298357&r1=1298356&r2=1298357&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Thu Mar  8 11:13:23 2012
@@ -23,12 +23,13 @@ package org.apache.bookkeeper.bookie;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.HashMap;
 
 import junit.framework.TestCase;
 
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.junit.After;
 import org.junit.Before;
@@ -54,7 +55,7 @@ public class EntryLogTest extends TestCa
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
         // create some entries
-        EntryLogger logger = new EntryLogger(conf, null);
+        EntryLogger logger = new EntryLogger(conf);
         logger.addEntry(1, generateEntry(1, 1));
         logger.addEntry(3, generateEntry(3, 1));
         logger.addEntry(2, generateEntry(2, 1));
@@ -65,18 +66,14 @@ public class EntryLogTest extends TestCa
         raf.setLength(raf.length()-10);
         raf.close();
         // now see which ledgers are in the log
-        logger = new EntryLogger(conf, null);
-        logger.start();
-
-        Thread.sleep(2 * gcWaitTime);
-        Field entryLogs2LedgersMapField = logger.getClass().getDeclaredField("entryLogs2LedgersMap");
-        entryLogs2LedgersMapField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        Map<Long, Map<Long, Boolean>> ledgersMap = (Map<Long, Map<Long, Boolean>>) entryLogs2LedgersMapField.get(logger);
-        LOG.info("LedgersMap.get(0) {}", ledgersMap.get(0L));
-        assertNotNull(ledgersMap.get(0L).get(1L));
-        assertNull(ledgersMap.get(0L).get(2L));
-        assertNotNull(ledgersMap.get(0L).get(3L));
+        logger = new EntryLogger(conf);
+        EntryLogMetadata meta =
+            logger.extractMetaFromEntryLog(0L);
+
+        LOG.info("Extracted Meta From Entry Log {}", meta);
+        assertNotNull(meta.ledgersMap.get(1L));
+        assertNull(meta.ledgersMap.get(2L));
+        assertNotNull(meta.ledgersMap.get(3L));
     }
 
     private ByteBuffer generateEntry(long ledger, long entry) {