You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/22 07:44:14 UTC

svn commit: r1534503 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookk...

Author: sijie
Date: Tue Oct 22 05:44:13 2013
New Revision: 1534503

URL: http://svn.apache.org/r1534503
Log:
BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Oct 22 05:44:13 2013
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)
 
+      BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Tue Oct 22 05:44:13 2013
@@ -91,6 +91,19 @@ ledgerDirectories=/tmp/bk-data
 # If it is set to less than zero, the major compaction is disabled. 
 # majorCompactionInterval=86400 
 
+# Set the maximum number of entries which can be compacted without flushing.
+# When compacting, the entries are written to the entrylog and the new offsets
+# are cached in memory. Once the entrylog is flushed the index is updated with
+# the new offsets. This parameter controls the number of entries added to the
+# entrylog before a flush is forced. A higher value for this parameter means
+# more memory will be used for offsets. Each offset consists of 3 longs.
+# This parameter should _not_ be modified unless you know what you're doing.
+# The default is 100,000.
+#compactionMaxOutstandingRequests=100000
+
+# Set the rate at which compaction will readd entries. The unit is adds per second.
+#compactionRate=1000
+
 # Max file size of journal file, in mega bytes
 # A new journal file will be created when the old one reaches the file size limitation
 #

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Oct 22 05:44:13 2013
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -420,8 +419,7 @@ public class Bookie extends BookieThread
         // instantiate the journal
         journal = new Journal(conf, ledgerDirsManager);
         ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
-                                                     ledgerDirsManager, journal,
-                                                     new BookieSafeEntryAdder());
+                                                     ledgerDirsManager, journal);
         syncThread = new SyncThread(conf, getLedgerDirsListener(),
                                     ledgerStorage, journal);
 
@@ -1087,37 +1085,6 @@ public class Bookie extends BookieThread
         return true;
     }
 
-    private class BookieSafeEntryAdder implements SafeEntryAdder {
-        @Override
-        public void safeAddEntry(final long ledgerId, final ByteBuffer buffer,
-                                 final GenericCallback<Void> cb) {
-            journal.logAddEntry(buffer, new WriteCallback() {
-                    @Override
-                    public void writeComplete(int rc, long ledgerId2, long entryId,
-                                              InetSocketAddress addr, Object ctx) {
-                        if (rc != BookieException.Code.OK) {
-                            LOG.error("Error rewriting to journal (ledger {}, entry {})", ledgerId2, entryId);
-                            cb.operationComplete(rc, null);
-                            return;
-                        }
-                        try {
-                            addEntryByLedgerId(ledgerId, buffer);
-                            cb.operationComplete(rc, null);
-                        } catch (IOException ioe) {
-                            LOG.error("Error adding to ledger storage (ledger " + ledgerId2
-                                      + ", entry " + entryId + ")", ioe);
-                            // couldn't add to ledger storage
-                            cb.operationComplete(BookieException.Code.IllegalOpException, null);
-                        } catch (BookieException bke) {
-                            LOG.error("Bookie error adding to ledger storage (ledger " + ledgerId2
-                                      + ", entry " + entryId + ")", bke);
-                            // couldn't add to ledger storage
-                            cb.operationComplete(bke.getCode(), null);
-                        }
-                    }
-                }, null);
-        }
-    }
     /**
      * @param args
      * @throws IOException

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Oct 22 05:44:13 2013
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -73,7 +74,8 @@ public class EntryLogger {
     final long logSizeLimit;
     private List<BufferedChannel> logChannelsToFlush;
     private volatile BufferedChannel logChannel;
-    private final EntryLogListener listener;
+    private final CopyOnWriteArrayList<EntryLogListener> listeners
+        = new CopyOnWriteArrayList<EntryLogListener>();
 
     /**
      * The 1K block at the head of the entry logger file
@@ -136,7 +138,9 @@ public class EntryLogger {
             LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
                     throws IOException {
         this.ledgerDirsManager = ledgerDirsManager;
-        this.listener = listener;
+        if (listener != null) {
+            addListener(listener);
+        }
         // log size limit
         this.logSizeLimit = conf.getEntryLogSizeLimit();
 
@@ -163,6 +167,12 @@ public class EntryLogger {
         initialize();
     }
 
+    void addListener(EntryLogListener listener) {
+        if (null != listener) {
+            listeners.add(listener);
+        }
+    }
+
     /**
      * Maps entry log files to open channels.
      */
@@ -236,7 +246,7 @@ public class EntryLogger {
             // so the readers could access the data from filesystem.
             logChannel.flush(false);
             logChannelsToFlush.add(logChannel);
-            if (null != listener) {
+            for (EntryLogListener listener : listeners) {
                 listener.onRotateEntryLog();
             }
         }
@@ -432,12 +442,16 @@ public class EntryLogger {
         return (logId << 32L) | pos;
     }
 
+    static long logIdForOffset(long offset) {
+        return offset >> 32L;
+    }
+
     synchronized boolean reachEntryLogLimit(long size) {
         return logChannel.position() + size > logSizeLimit;
     }
 
     byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
-        long entryLogId = location >> 32L;
+        long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
         ByteBuffer sizeBuff = ByteBuffer.allocate(4);
         pos -= 4; // we want to get the ledgerId and length to check

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Oct 22 05:44:13 2013
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
  */
 public class GarbageCollectorThread extends Thread {
     private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
-    private static final int COMPACTION_MAX_OUTSTANDING_REQUESTS = 1000;
     private static final int SECOND = 1000;
 
     // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
@@ -69,9 +68,12 @@ public class GarbageCollectorThread exte
     long lastMinorCompactionTime;
     long lastMajorCompactionTime;
 
+    final int maxOutstandingRequests;
+    final int compactionRate;
+    final CompactionScannerFactory scannerFactory;
+
     // Entry Logger Handle
     final EntryLogger entryLogger;
-    final SafeEntryAdder safeEntryAdder;
 
     // Ledger Cache Handle
     final LedgerCache ledgerCache;
@@ -89,77 +91,90 @@ public class GarbageCollectorThread exte
     final GarbageCollector garbageCollector;
     final GarbageCleaner garbageCleaner;
 
-
-    /**
-     * Interface for adding entries. When the write callback is triggered, the
-     * entry must be guaranteed to be presisted.
-     */
-    interface SafeEntryAdder {
-        public void safeAddEntry(long ledgerId, ByteBuffer buffer, GenericCallback<Void> cb);
+    private static class Offset {
+        final long ledger;
+        final long entry;
+        final long offset;
+
+        Offset(long ledger, long entry, long offset) {
+            this.ledger = ledger;
+            this.entry = entry;
+            this.offset = offset;
+        }
     }
 
     /**
      * A scanner wrapper to check whether a ledger is alive in an entry log file
      */
-    class CompactionScanner implements EntryLogScanner {
-        EntryLogMetadata meta;
-        Object completionLock = new Object();
-        AtomicInteger outstandingRequests = new AtomicInteger(0);
-        AtomicBoolean allSuccessful = new AtomicBoolean(true);
+    class CompactionScannerFactory implements EntryLogger.EntryLogListener {
+        List<Offset> offsets = new ArrayList<Offset>();
 
-        public CompactionScanner(EntryLogMetadata meta) {
-            this.meta = meta;
-        }
+        EntryLogScanner newScanner(final EntryLogMetadata meta) {
+            final RateLimiter rateLimiter = RateLimiter.create(compactionRate);
+            return new EntryLogScanner() {
+                @Override
+                public boolean accept(long ledgerId) {
+                    return meta.containsLedger(ledgerId);
+                }
 
-        @Override
-        public boolean accept(long ledgerId) {
-            return meta.containsLedger(ledgerId);
+                @Override
+                public void process(final long ledgerId, long offset, ByteBuffer entry)
+                        throws IOException {
+                    rateLimiter.acquire();
+                    synchronized (CompactionScannerFactory.this) {
+                        if (offsets.size() > maxOutstandingRequests) {
+                            waitEntrylogFlushed();
+                        }
+                        entry.getLong(); // discard ledger id, we already have it
+                        long entryId = entry.getLong();
+                        entry.rewind();
+
+                        long newoffset = entryLogger.addEntry(ledgerId, entry);
+                        offsets.add(new Offset(ledgerId, entryId, newoffset));
+                    }
+                }
+            };
         }
 
+        Object flushLock = new Object();
+
         @Override
-        public void process(final long ledgerId, long offset, ByteBuffer entry)
-            throws IOException {
-            if (!allSuccessful.get()) {
-                return;
+        public void onRotateEntryLog() {
+            synchronized (flushLock) {
+                flushLock.notifyAll();
             }
+        }
 
-            outstandingRequests.incrementAndGet();
-            synchronized (completionLock) {
-                while (outstandingRequests.get() >= COMPACTION_MAX_OUTSTANDING_REQUESTS) {
-                    try {
-                        completionLock.wait();
-                    } catch (InterruptedException ie) {
-                        LOG.error("Interrupted while waiting to re-add entry", ie);
-                        Thread.currentThread().interrupt();
-                        throw new IOException("Interrupted while waiting to re-add entry", ie);
+        synchronized private void waitEntrylogFlushed() throws IOException {
+            try {
+                synchronized (flushLock) {
+                    Offset lastOffset = offsets.get(offsets.size()-1);
+                    long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+                    while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
+                        flushLock.wait(1000);
+
+                        lastOffset = offsets.get(offsets.size()-1);
+                        lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+                    }
+                    if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
+                        throw new IOException("Shutdown before flushed");
                     }
                 }
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted waiting for flush", ie);
             }
-            safeEntryAdder.safeAddEntry(ledgerId, entry, new GenericCallback<Void>() {
-                    @Override
-                    public void operationComplete(int rc, Void result) {
-                        if (rc != BookieException.Code.OK) {
-                            LOG.error("Error {} re-adding entry for ledger {})",
-                                      rc, ledgerId);
-                            allSuccessful.set(false);
-                        }
-                        synchronized(completionLock) {
-                            outstandingRequests.decrementAndGet();
-                            completionLock.notifyAll();
-                        }
-                    }
-                });
-        }
 
-        void awaitComplete() throws InterruptedException, IOException {
-            synchronized(completionLock) {
-                while (outstandingRequests.get() > 0) {
-                    completionLock.wait();
-                }
-                if (allSuccessful.get() == false) {
-                    throw new IOException("Couldn't re-add all entries");
-                }
+            for (Offset o : offsets) {
+                ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
             }
+            offsets.clear();
+        }
+
+        synchronized void flush() throws IOException {
+            waitEntrylogFlushed();
+
+            ledgerCache.flushLedger(true);
         }
     }
 
@@ -175,7 +190,6 @@ public class GarbageCollectorThread exte
                                   final LedgerCache ledgerCache,
                                   EntryLogger entryLogger,
                                   SnapshotMap<Long, Boolean> activeLedgers,
-                                  SafeEntryAdder safeEntryAdder,
                                   LedgerManager ledgerManager)
         throws IOException {
         super("GarbageCollectorThread");
@@ -183,9 +197,12 @@ public class GarbageCollectorThread exte
         this.ledgerCache = ledgerCache;
         this.entryLogger = entryLogger;
         this.activeLedgers = activeLedgers;
-        this.safeEntryAdder = safeEntryAdder;
 
         this.gcWaitTime = conf.getGcWaitTime();
+        this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests();
+        this.compactionRate = conf.getCompactionRate();
+        this.scannerFactory = new CompactionScannerFactory();
+        entryLogger.addListener(this.scannerFactory);
 
         this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
             @Override
@@ -354,16 +371,42 @@ public class GarbageCollectorThread exte
         List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>();
         logsToCompact.addAll(entryLogMetaMap.values());
         Collections.sort(logsToCompact, sizeComparator);
+        List<Long> toRemove = new ArrayList<Long>();
+
         for (EntryLogMetadata meta : logsToCompact) {
             if (meta.getUsage() >= threshold) {
                 break;
             }
             LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold);
-            compactEntryLog(meta.entryLogId);
+            try {
+                compactEntryLog(scannerFactory, meta);
+                toRemove.add(meta.entryLogId);
+            } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
+                LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
+                break;
+            } catch (IOException ioe) {
+                // if compact entry log throws IOException, we don't want to remove that
+                // entry log. however, if some entries from that log have been readded
+                // to the entry log, and the offset updated, it's ok to flush that
+                LOG.error("Error compacting entry log. Log won't be deleted", ioe);
+            }
+
             if (!running) { // if gc thread is not running, stop compaction
                 return;
             }
         }
+        try {
+            // compaction finished, flush any outstanding offsets
+            scannerFactory.flush();
+        } catch (IOException ioe) {
+            LOG.error("Cannot flush compacted entries, skip removal", ioe);
+            return;
+        }
+
+        // offsets have been flushed, its now safe to remove the old entrylogs
+        for (Long l : toRemove) {
+            removeEntryLog(l);
+        }
     }
 
     /**
@@ -401,13 +444,8 @@ public class GarbageCollectorThread exte
      * @param entryLogId
      *          Entry Log File Id
      */
-    protected void compactEntryLog(long entryLogId) {
-        EntryLogMetadata entryLogMeta = entryLogMetaMap.get(entryLogId);
-        if (null == entryLogMeta) {
-            LOG.warn("Can't get entry log meta when compacting entry log " + entryLogId + ".");
-            return;
-        }
-
+    protected void compactEntryLog(CompactionScannerFactory scannerFactory,
+                                   EntryLogMetadata entryLogMeta) throws IOException {
         // Similar with Sync Thread
         // try to mark compacting flag to make sure it would not be interrupted
         // by shutdown during compaction. otherwise it will receive
@@ -419,19 +457,11 @@ public class GarbageCollectorThread exte
             return;
         }
 
-        LOG.info("Compacting entry log : " + entryLogId);
+        LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId);
 
         try {
-            CompactionScanner scanner = new CompactionScanner(entryLogMeta);
-            entryLogger.scanEntryLog(entryLogId, scanner);
-            scanner.awaitComplete();
-            // after moving entries to new entry log, remove this old one
-            removeEntryLog(entryLogId);
-        } catch (IOException e) {
-            LOG.info("Premature exception when compacting " + entryLogId, e);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted while compacting", ie);
+            entryLogger.scanEntryLog(entryLogMeta.entryLogId,
+                                     scannerFactory.newScanner(entryLogMeta));
         } finally {
             // clear compacting flag
             compacting.set(false);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Oct 22 05:44:13 2013
@@ -80,14 +80,14 @@ class InterleavedLedgerStorage implement
     private volatile boolean somethingWritten = false;
 
     InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
-            LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource,
-            GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException {
+                             LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource)
+            throws IOException {
         activeLedgers = new SnapshotMap<Long, Boolean>();
         this.checkpointSource = checkpointSource;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
-                activeLedgers, safeEntryAdder, ledgerManager);
+                activeLedgers, ledgerManager);
     }
 
     @Override
@@ -207,6 +207,7 @@ class InterleavedLedgerStorage implement
         // current entry logger file isn't flushed yet.
         flushOrCheckpoint(true);
         // after the ledger storage finished checkpointing, try to clear the done checkpoint
+
         checkpointHolder.clearLastCheckpoint(lastCheckpoint);
         return lastCheckpoint;
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Tue Oct 22 05:44:13 2013
@@ -36,6 +36,9 @@ public class ServerConfiguration extends
     protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
     protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
     protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
+    protected final static String COMPACTION_MAX_OUTSTANDING_REQUESTS
+        = "compactionMaxOutstandingRequests";
+    protected final static String COMPACTION_RATE = "compactionRate";
 
     // Gc Parameters
     protected final static String GC_WAIT_TIME = "gcWaitTime";
@@ -789,16 +792,6 @@ public class ServerConfiguration extends
     }
 
     /**
-     * Should we remove pages from page cache after force write
-     *
-     * @return remove pages from cache
-     */
-    @Beta
-    public boolean getJournalRemovePagesFromCache() {
-        return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
-    }
-
-    /**
      * Set whether the bookie is able to go into read-only mode.
      * If this is set to false, the bookie will shutdown on encountering
      * an error condition.
@@ -908,4 +901,77 @@ public class ServerConfiguration extends
         return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
     }
 
+    /**
+     * Get the maximum number of entries which can be compacted without flushing.
+     * Default is 100,000.
+     *
+     * @return the maximum number of unflushed entries
+     */
+    public int getCompactionMaxOutstandingRequests() {
+        return getInt(COMPACTION_MAX_OUTSTANDING_REQUESTS, 100000);
+    }
+
+    /**
+     * Set the maximum number of entries which can be compacted without flushing.
+     *
+     * When compacting, the entries are written to the entrylog and the new offsets
+     * are cached in memory. Once the entrylog is flushed the index is updated with
+     * the new offsets. This parameter controls the number of entries added to the
+     * entrylog before a flush is forced. A higher value for this parameter means
+     * more memory will be used for offsets. Each offset consists of 3 longs.
+     *
+     * This parameter should _not_ be modified unless you know what you're doing.
+     * The default is 100,000.
+     *
+     * @param maxOutstandingRequests number of entries to compact before flushing
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setCompactionMaxOutstandingRequests(int maxOutstandingRequests) {
+        setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests);
+        return this;
+    }
+
+    /**
+     * Get the rate of compaction adds. Default is 1,000.
+     *
+     * @return rate of compaction (adds per second)
+     */
+    public int getCompactionRate() {
+        return getInt(COMPACTION_RATE, 1000);
+    }
+
+    /**
+     * Set the rate of compaction adds.
+     *
+     * @param rate rate of compaction adds (adds per second)
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setCompactionRate(int rate) {
+        setProperty(COMPACTION_RATE, rate);
+        return this;
+    }
+
+    /**
+     * Should we remove pages from page cache after force write
+     *
+     * @return remove pages from cache
+     */
+    @Beta
+    public boolean getJournalRemovePagesFromCache() {
+        return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+    }
+
+    /**
+     * Sets that whether should we remove pages from page cache after force write.
+     *
+     * @param enabled
+     *            - true if we need to remove pages from page cache. otherwise, false
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
+        setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
+        return this;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Tue Oct 22 05:44:13 2013
@@ -21,15 +21,30 @@ package org.apache.bookkeeper.bookie;
  *
  */
 import java.io.File;
-import java.util.Arrays;
+import java.io.IOException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collections;
 import java.util.Enumeration;
 
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.TestUtils;
 
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,4 +309,157 @@ public class CompactionTest extends Book
         // since those entries has been compacted to new entry log
         verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
     }
+
+    /**
+     * Test that compaction doesnt add to index without having persisted
+     * entrylog first. This is needed because compaction doesn't go through the journal.
+     * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-530}
+     * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-664}
+     */
+    @Test(timeout=60000)
+    public void testCompactionSafety() throws Exception {
+        tearDown(); // I dont want the test infrastructure
+        ServerConfiguration conf = new ServerConfiguration();
+        final Set<Long> ledgers = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+        LedgerManager manager = new LedgerManager() {
+                @Override
+                public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) {
+                    unsupported();
+                }
+                @Override
+                public void removeLedgerMetadata(long ledgerId, Version version,
+                                                 GenericCallback<Void> vb) {
+                    unsupported();
+                }
+                @Override
+                public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb) {
+                    unsupported();
+                }
+                @Override
+                public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
+                        GenericCallback<Void> cb) {
+                    unsupported();
+                }
+                @Override
+                public void asyncProcessLedgers(Processor<Long> processor,
+                                                AsyncCallback.VoidCallback finalCb,
+                        Object context, int successRc, int failureRc) {
+                    unsupported();
+                }
+                @Override
+                public void close() throws IOException {}
+
+                void unsupported() {
+                    LOG.error("Unsupported operation called", new Exception());
+                    throw new RuntimeException("Unsupported op");
+                }
+                @Override
+                public LedgerRangeIterator getLedgerRanges() {
+                    final AtomicBoolean hasnext = new AtomicBoolean(true);
+                    return new LedgerManager.LedgerRangeIterator() {
+                        @Override
+                        public boolean hasNext() throws IOException {
+                            return hasnext.get();
+                        }
+                        @Override
+                        public LedgerManager.LedgerRange next() throws IOException {
+                            hasnext.set(false);
+                            return new LedgerManager.LedgerRange(ledgers);
+                        }
+                    };
+                 }
+            };
+
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+        conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+
+        conf.setEntryLogSizeLimit(EntryLogger.LOGFILE_HEADER_SIZE + 3 * (4+ENTRY_SIZE));
+        conf.setGcWaitTime(100);
+        conf.setMinorCompactionThreshold(0.7f);
+        conf.setMajorCompactionThreshold(0.0f);
+        conf.setMinorCompactionInterval(1);
+        conf.setMajorCompactionInterval(10);
+        conf.setPageLimit(1);
+
+        CheckpointSource checkpointSource = new CheckpointSource() {
+                AtomicInteger idGen = new AtomicInteger(0);
+                class MyCheckpoint implements CheckpointSource.Checkpoint {
+                    int id = idGen.incrementAndGet();
+                    @Override
+                    public int compareTo(CheckpointSource.Checkpoint o) {
+                        if (o == CheckpointSource.Checkpoint.MAX) {
+                            return -1;
+                        } else if (o == CheckpointSource.Checkpoint.MIN) {
+                            return 1;
+                        }
+                        return id - ((MyCheckpoint)o).id;
+                    }
+                }
+
+                @Override
+                public CheckpointSource.Checkpoint newCheckpoint() {
+                    return new MyCheckpoint();
+                }
+
+                public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean compact)
+                        throws IOException {
+                }
+            };
+        final byte[] KEY = "foobar".getBytes();
+        File log0 = new File(curDir, "0.log");
+        LedgerDirsManager dirs = new LedgerDirsManager(conf);
+        assertFalse("Log shouldnt exist", log0.exists());
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, manager,
+                                                                        dirs, checkpointSource);
+        ledgers.add(1l);
+        ledgers.add(2l);
+        ledgers.add(3l);
+        storage.setMasterKey(1, KEY);
+        storage.setMasterKey(2, KEY);
+        storage.setMasterKey(3, KEY);
+        storage.addEntry(genEntry(1, 1, ENTRY_SIZE));
+        storage.addEntry(genEntry(2, 1, ENTRY_SIZE));
+        storage.addEntry(genEntry(2, 2, ENTRY_SIZE));
+        storage.addEntry(genEntry(3, 2, ENTRY_SIZE));
+        storage.flush();
+        storage.shutdown();
+
+        assertTrue("Log should exist", log0.exists());
+        ledgers.remove(2l);
+        ledgers.remove(3l);
+
+        storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+        storage.start();
+        for (int i = 0; i < 10; i++) {
+            if (!log0.exists()) {
+                break;
+            }
+            Thread.sleep(1000);
+            storage.entryLogger.flush(); // simulate sync thread
+        }
+        assertFalse("Log shouldnt exist", log0.exists());
+
+        ledgers.add(4l);
+        storage.setMasterKey(4, KEY);
+        storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush
+
+        storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+        storage.getEntry(1, 1); // entry should exist
+    }
+
+    private ByteBuffer genEntry(long ledger, long entry, int size) {
+        byte[] data = new byte[size];
+        ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+        bb.putLong(ledger);
+        bb.putLong(entry);
+        while (bb.hasRemaining()) {
+            bb.put((byte)0xFF);
+        }
+        bb.flip();
+        return bb;
+    }
 }