You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/22 07:44:14 UTC
svn commit: r1534503 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/test/java/org/apache/bookk...
Author: sijie
Date: Tue Oct 22 05:44:13 2013
New Revision: 1534503
URL: http://svn.apache.org/r1534503
Log:
BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Oct 22 05:44:13 2013
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie)
+ BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Tue Oct 22 05:44:13 2013
@@ -91,6 +91,19 @@ ledgerDirectories=/tmp/bk-data
# If it is set to less than zero, the major compaction is disabled.
# majorCompactionInterval=86400
+# Set the maximum number of entries which can be compacted without flushing.
+# When compacting, the entries are written to the entrylog and the new offsets
+# are cached in memory. Once the entrylog is flushed the index is updated with
+# the new offsets. This parameter controls the number of entries added to the
+# entrylog before a flush is forced. A higher value for this parameter means
+# more memory will be used for offsets. Each offset consists of 3 longs.
+# This parameter should _not_ be modified unless you know what you're doing.
+# The default is 100,000.
+#compactionMaxOutstandingRequests=100000
+
+# Set the rate at which compaction will readd entries. The unit is adds per second.
+#compactionRate=1000
+
# Max file size of journal file, in mega bytes
# A new journal file will be created when the old one reaches the file size limitation
#
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Oct 22 05:44:13 2013
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -420,8 +419,7 @@ public class Bookie extends BookieThread
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager);
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
- ledgerDirsManager, journal,
- new BookieSafeEntryAdder());
+ ledgerDirsManager, journal);
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, journal);
@@ -1087,37 +1085,6 @@ public class Bookie extends BookieThread
return true;
}
- private class BookieSafeEntryAdder implements SafeEntryAdder {
- @Override
- public void safeAddEntry(final long ledgerId, final ByteBuffer buffer,
- final GenericCallback<Void> cb) {
- journal.logAddEntry(buffer, new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId2, long entryId,
- InetSocketAddress addr, Object ctx) {
- if (rc != BookieException.Code.OK) {
- LOG.error("Error rewriting to journal (ledger {}, entry {})", ledgerId2, entryId);
- cb.operationComplete(rc, null);
- return;
- }
- try {
- addEntryByLedgerId(ledgerId, buffer);
- cb.operationComplete(rc, null);
- } catch (IOException ioe) {
- LOG.error("Error adding to ledger storage (ledger " + ledgerId2
- + ", entry " + entryId + ")", ioe);
- // couldn't add to ledger storage
- cb.operationComplete(BookieException.Code.IllegalOpException, null);
- } catch (BookieException bke) {
- LOG.error("Bookie error adding to ledger storage (ledger " + ledgerId2
- + ", entry " + entryId + ")", bke);
- // couldn't add to ledger storage
- cb.operationComplete(bke.getCode(), null);
- }
- }
- }, null);
- }
- }
/**
* @param args
* @throws IOException
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Oct 22 05:44:13 2013
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -73,7 +74,8 @@ public class EntryLogger {
final long logSizeLimit;
private List<BufferedChannel> logChannelsToFlush;
private volatile BufferedChannel logChannel;
- private final EntryLogListener listener;
+ private final CopyOnWriteArrayList<EntryLogListener> listeners
+ = new CopyOnWriteArrayList<EntryLogListener>();
/**
* The 1K block at the head of the entry logger file
@@ -136,7 +138,9 @@ public class EntryLogger {
LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
throws IOException {
this.ledgerDirsManager = ledgerDirsManager;
- this.listener = listener;
+ if (listener != null) {
+ addListener(listener);
+ }
// log size limit
this.logSizeLimit = conf.getEntryLogSizeLimit();
@@ -163,6 +167,12 @@ public class EntryLogger {
initialize();
}
+ void addListener(EntryLogListener listener) {
+ if (null != listener) {
+ listeners.add(listener);
+ }
+ }
+
/**
* Maps entry log files to open channels.
*/
@@ -236,7 +246,7 @@ public class EntryLogger {
// so the readers could access the data from filesystem.
logChannel.flush(false);
logChannelsToFlush.add(logChannel);
- if (null != listener) {
+ for (EntryLogListener listener : listeners) {
listener.onRotateEntryLog();
}
}
@@ -432,12 +442,16 @@ public class EntryLogger {
return (logId << 32L) | pos;
}
+ static long logIdForOffset(long offset) {
+ return offset >> 32L;
+ }
+
synchronized boolean reachEntryLogLimit(long size) {
return logChannel.position() + size > logSizeLimit;
}
byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
- long entryLogId = location >> 32L;
+ long entryLogId = logIdForOffset(location);
long pos = location & 0xffffffffL;
ByteBuffer sizeBuff = ByteBuffer.allocate(4);
pos -= 4; // we want to get the ledgerId and length to check
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Oct 22 05:44:13 2013
@@ -30,9 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import com.google.common.util.concurrent.RateLimiter;
+
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
*/
public class GarbageCollectorThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
- private static final int COMPACTION_MAX_OUTSTANDING_REQUESTS = 1000;
private static final int SECOND = 1000;
// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
@@ -69,9 +68,12 @@ public class GarbageCollectorThread exte
long lastMinorCompactionTime;
long lastMajorCompactionTime;
+ final int maxOutstandingRequests;
+ final int compactionRate;
+ final CompactionScannerFactory scannerFactory;
+
// Entry Logger Handle
final EntryLogger entryLogger;
- final SafeEntryAdder safeEntryAdder;
// Ledger Cache Handle
final LedgerCache ledgerCache;
@@ -89,77 +91,90 @@ public class GarbageCollectorThread exte
final GarbageCollector garbageCollector;
final GarbageCleaner garbageCleaner;
-
- /**
- * Interface for adding entries. When the write callback is triggered, the
- * entry must be guaranteed to be presisted.
- */
- interface SafeEntryAdder {
- public void safeAddEntry(long ledgerId, ByteBuffer buffer, GenericCallback<Void> cb);
+ private static class Offset {
+ final long ledger;
+ final long entry;
+ final long offset;
+
+ Offset(long ledger, long entry, long offset) {
+ this.ledger = ledger;
+ this.entry = entry;
+ this.offset = offset;
+ }
}
/**
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
- class CompactionScanner implements EntryLogScanner {
- EntryLogMetadata meta;
- Object completionLock = new Object();
- AtomicInteger outstandingRequests = new AtomicInteger(0);
- AtomicBoolean allSuccessful = new AtomicBoolean(true);
+ class CompactionScannerFactory implements EntryLogger.EntryLogListener {
+ List<Offset> offsets = new ArrayList<Offset>();
- public CompactionScanner(EntryLogMetadata meta) {
- this.meta = meta;
- }
+ EntryLogScanner newScanner(final EntryLogMetadata meta) {
+ final RateLimiter rateLimiter = RateLimiter.create(compactionRate);
+ return new EntryLogScanner() {
+ @Override
+ public boolean accept(long ledgerId) {
+ return meta.containsLedger(ledgerId);
+ }
- @Override
- public boolean accept(long ledgerId) {
- return meta.containsLedger(ledgerId);
+ @Override
+ public void process(final long ledgerId, long offset, ByteBuffer entry)
+ throws IOException {
+ rateLimiter.acquire();
+ synchronized (CompactionScannerFactory.this) {
+ if (offsets.size() > maxOutstandingRequests) {
+ waitEntrylogFlushed();
+ }
+ entry.getLong(); // discard ledger id, we already have it
+ long entryId = entry.getLong();
+ entry.rewind();
+
+ long newoffset = entryLogger.addEntry(ledgerId, entry);
+ offsets.add(new Offset(ledgerId, entryId, newoffset));
+ }
+ }
+ };
}
+ Object flushLock = new Object();
+
@Override
- public void process(final long ledgerId, long offset, ByteBuffer entry)
- throws IOException {
- if (!allSuccessful.get()) {
- return;
+ public void onRotateEntryLog() {
+ synchronized (flushLock) {
+ flushLock.notifyAll();
}
+ }
- outstandingRequests.incrementAndGet();
- synchronized (completionLock) {
- while (outstandingRequests.get() >= COMPACTION_MAX_OUTSTANDING_REQUESTS) {
- try {
- completionLock.wait();
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while waiting to re-add entry", ie);
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting to re-add entry", ie);
+ synchronized private void waitEntrylogFlushed() throws IOException {
+ try {
+ synchronized (flushLock) {
+ Offset lastOffset = offsets.get(offsets.size()-1);
+ long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+ while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
+ flushLock.wait(1000);
+
+ lastOffset = offsets.get(offsets.size()-1);
+ lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+ }
+ if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
+ throw new IOException("Shutdown before flushed");
}
}
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted waiting for flush", ie);
}
- safeEntryAdder.safeAddEntry(ledgerId, entry, new GenericCallback<Void>() {
- @Override
- public void operationComplete(int rc, Void result) {
- if (rc != BookieException.Code.OK) {
- LOG.error("Error {} re-adding entry for ledger {})",
- rc, ledgerId);
- allSuccessful.set(false);
- }
- synchronized(completionLock) {
- outstandingRequests.decrementAndGet();
- completionLock.notifyAll();
- }
- }
- });
- }
- void awaitComplete() throws InterruptedException, IOException {
- synchronized(completionLock) {
- while (outstandingRequests.get() > 0) {
- completionLock.wait();
- }
- if (allSuccessful.get() == false) {
- throw new IOException("Couldn't re-add all entries");
- }
+ for (Offset o : offsets) {
+ ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
}
+ offsets.clear();
+ }
+
+ synchronized void flush() throws IOException {
+ waitEntrylogFlushed();
+
+ ledgerCache.flushLedger(true);
}
}
@@ -175,7 +190,6 @@ public class GarbageCollectorThread exte
final LedgerCache ledgerCache,
EntryLogger entryLogger,
SnapshotMap<Long, Boolean> activeLedgers,
- SafeEntryAdder safeEntryAdder,
LedgerManager ledgerManager)
throws IOException {
super("GarbageCollectorThread");
@@ -183,9 +197,12 @@ public class GarbageCollectorThread exte
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
this.activeLedgers = activeLedgers;
- this.safeEntryAdder = safeEntryAdder;
this.gcWaitTime = conf.getGcWaitTime();
+ this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests();
+ this.compactionRate = conf.getCompactionRate();
+ this.scannerFactory = new CompactionScannerFactory();
+ entryLogger.addListener(this.scannerFactory);
this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
@Override
@@ -354,16 +371,42 @@ public class GarbageCollectorThread exte
List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>();
logsToCompact.addAll(entryLogMetaMap.values());
Collections.sort(logsToCompact, sizeComparator);
+ List<Long> toRemove = new ArrayList<Long>();
+
for (EntryLogMetadata meta : logsToCompact) {
if (meta.getUsage() >= threshold) {
break;
}
LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold);
- compactEntryLog(meta.entryLogId);
+ try {
+ compactEntryLog(scannerFactory, meta);
+ toRemove.add(meta.entryLogId);
+ } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
+ LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
+ break;
+ } catch (IOException ioe) {
+ // if compact entry log throws IOException, we don't want to remove that
+ // entry log. however, if some entries from that log have been readded
+ // to the entry log, and the offset updated, it's ok to flush that
+ LOG.error("Error compacting entry log. Log won't be deleted", ioe);
+ }
+
if (!running) { // if gc thread is not running, stop compaction
return;
}
}
+ try {
+ // compaction finished, flush any outstanding offsets
+ scannerFactory.flush();
+ } catch (IOException ioe) {
+ LOG.error("Cannot flush compacted entries, skip removal", ioe);
+ return;
+ }
+
+ // offsets have been flushed, its now safe to remove the old entrylogs
+ for (Long l : toRemove) {
+ removeEntryLog(l);
+ }
}
/**
@@ -401,13 +444,8 @@ public class GarbageCollectorThread exte
* @param entryLogId
* Entry Log File Id
*/
- protected void compactEntryLog(long entryLogId) {
- EntryLogMetadata entryLogMeta = entryLogMetaMap.get(entryLogId);
- if (null == entryLogMeta) {
- LOG.warn("Can't get entry log meta when compacting entry log " + entryLogId + ".");
- return;
- }
-
+ protected void compactEntryLog(CompactionScannerFactory scannerFactory,
+ EntryLogMetadata entryLogMeta) throws IOException {
// Similar with Sync Thread
// try to mark compacting flag to make sure it would not be interrupted
// by shutdown during compaction. otherwise it will receive
@@ -419,19 +457,11 @@ public class GarbageCollectorThread exte
return;
}
- LOG.info("Compacting entry log : " + entryLogId);
+ LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId);
try {
- CompactionScanner scanner = new CompactionScanner(entryLogMeta);
- entryLogger.scanEntryLog(entryLogId, scanner);
- scanner.awaitComplete();
- // after moving entries to new entry log, remove this old one
- removeEntryLog(entryLogId);
- } catch (IOException e) {
- LOG.info("Premature exception when compacting " + entryLogId, e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while compacting", ie);
+ entryLogger.scanEntryLog(entryLogMeta.entryLogId,
+ scannerFactory.newScanner(entryLogMeta));
} finally {
// clear compacting flag
compacting.set(false);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Oct 22 05:44:13 2013
@@ -80,14 +80,14 @@ class InterleavedLedgerStorage implement
private volatile boolean somethingWritten = false;
InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
- LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource,
- GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException {
+ LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource)
+ throws IOException {
activeLedgers = new SnapshotMap<Long, Boolean>();
this.checkpointSource = checkpointSource;
entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
- activeLedgers, safeEntryAdder, ledgerManager);
+ activeLedgers, ledgerManager);
}
@Override
@@ -207,6 +207,7 @@ class InterleavedLedgerStorage implement
// current entry logger file isn't flushed yet.
flushOrCheckpoint(true);
// after the ledger storage finished checkpointing, try to clear the done checkpoint
+
checkpointHolder.clearLastCheckpoint(lastCheckpoint);
return lastCheckpoint;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Tue Oct 22 05:44:13 2013
@@ -36,6 +36,9 @@ public class ServerConfiguration extends
protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
+ protected final static String COMPACTION_MAX_OUTSTANDING_REQUESTS
+ = "compactionMaxOutstandingRequests";
+ protected final static String COMPACTION_RATE = "compactionRate";
// Gc Parameters
protected final static String GC_WAIT_TIME = "gcWaitTime";
@@ -789,16 +792,6 @@ public class ServerConfiguration extends
}
/**
- * Should we remove pages from page cache after force write
- *
- * @return remove pages from cache
- */
- @Beta
- public boolean getJournalRemovePagesFromCache() {
- return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
- }
-
- /**
* Set whether the bookie is able to go into read-only mode.
* If this is set to false, the bookie will shutdown on encountering
* an error condition.
@@ -908,4 +901,77 @@ public class ServerConfiguration extends
return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
}
+ /**
+ * Get the maximum number of entries which can be compacted without flushing.
+ * Default is 100,000.
+ *
+ * @return the maximum number of unflushed entries
+ */
+ public int getCompactionMaxOutstandingRequests() {
+ return getInt(COMPACTION_MAX_OUTSTANDING_REQUESTS, 100000);
+ }
+
+ /**
+ * Set the maximum number of entries which can be compacted without flushing.
+ *
+ * When compacting, the entries are written to the entrylog and the new offsets
+ * are cached in memory. Once the entrylog is flushed the index is updated with
+ * the new offsets. This parameter controls the number of entries added to the
+ * entrylog before a flush is forced. A higher value for this parameter means
+ * more memory will be used for offsets. Each offset consists of 3 longs.
+ *
+ * This parameter should _not_ be modified unless you know what you're doing.
+ * The default is 100,000.
+ *
+ * @param maxOutstandingRequests number of entries to compact before flushing
+ *
+ * @return ServerConfiguration
+ */
+ public ServerConfiguration setCompactionMaxOutstandingRequests(int maxOutstandingRequests) {
+ setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests);
+ return this;
+ }
+
+ /**
+ * Get the rate of compaction adds. Default is 1,000.
+ *
+ * @return rate of compaction (adds per second)
+ */
+ public int getCompactionRate() {
+ return getInt(COMPACTION_RATE, 1000);
+ }
+
+ /**
+ * Set the rate of compaction adds.
+ *
+ * @param rate rate of compaction adds (adds per second)
+ *
+ * @return ServerConfiguration
+ */
+ public ServerConfiguration setCompactionRate(int rate) {
+ setProperty(COMPACTION_RATE, rate);
+ return this;
+ }
+
+ /**
+ * Should we remove pages from page cache after force write
+ *
+ * @return remove pages from cache
+ */
+ @Beta
+ public boolean getJournalRemovePagesFromCache() {
+ return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+ }
+
+ /**
+ * Sets that whether should we remove pages from page cache after force write.
+ *
+ * @param enabled
+ * - true if we need to remove pages from page cache. otherwise, false
+ * @return ServerConfiguration
+ */
+ public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
+ setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
+ return this;
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1534503&r1=1534502&r2=1534503&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Tue Oct 22 05:44:13 2013
@@ -21,15 +21,30 @@ package org.apache.bookkeeper.bookie;
*
*/
import java.io.File;
-import java.util.Arrays;
+import java.io.IOException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collections;
import java.util.Enumeration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.TestUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,4 +309,157 @@ public class CompactionTest extends Book
// since those entries has been compacted to new entry log
verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
}
+
+ /**
+ * Test that compaction doesnt add to index without having persisted
+ * entrylog first. This is needed because compaction doesn't go through the journal.
+ * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-530}
+ * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-664}
+ */
+ @Test(timeout=60000)
+ public void testCompactionSafety() throws Exception {
+ tearDown(); // I dont want the test infrastructure
+ ServerConfiguration conf = new ServerConfiguration();
+ final Set<Long> ledgers = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+ LedgerManager manager = new LedgerManager() {
+ @Override
+ public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) {
+ unsupported();
+ }
+ @Override
+ public void removeLedgerMetadata(long ledgerId, Version version,
+ GenericCallback<Void> vb) {
+ unsupported();
+ }
+ @Override
+ public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb) {
+ unsupported();
+ }
+ @Override
+ public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
+ GenericCallback<Void> cb) {
+ unsupported();
+ }
+ @Override
+ public void asyncProcessLedgers(Processor<Long> processor,
+ AsyncCallback.VoidCallback finalCb,
+ Object context, int successRc, int failureRc) {
+ unsupported();
+ }
+ @Override
+ public void close() throws IOException {}
+
+ void unsupported() {
+ LOG.error("Unsupported operation called", new Exception());
+ throw new RuntimeException("Unsupported op");
+ }
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ final AtomicBoolean hasnext = new AtomicBoolean(true);
+ return new LedgerManager.LedgerRangeIterator() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return hasnext.get();
+ }
+ @Override
+ public LedgerManager.LedgerRange next() throws IOException {
+ hasnext.set(false);
+ return new LedgerManager.LedgerRange(ledgers);
+ }
+ };
+ }
+ };
+
+ File tmpDir = File.createTempFile("bkTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+ conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+
+ conf.setEntryLogSizeLimit(EntryLogger.LOGFILE_HEADER_SIZE + 3 * (4+ENTRY_SIZE));
+ conf.setGcWaitTime(100);
+ conf.setMinorCompactionThreshold(0.7f);
+ conf.setMajorCompactionThreshold(0.0f);
+ conf.setMinorCompactionInterval(1);
+ conf.setMajorCompactionInterval(10);
+ conf.setPageLimit(1);
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ AtomicInteger idGen = new AtomicInteger(0);
+ class MyCheckpoint implements CheckpointSource.Checkpoint {
+ int id = idGen.incrementAndGet();
+ @Override
+ public int compareTo(CheckpointSource.Checkpoint o) {
+ if (o == CheckpointSource.Checkpoint.MAX) {
+ return -1;
+ } else if (o == CheckpointSource.Checkpoint.MIN) {
+ return 1;
+ }
+ return id - ((MyCheckpoint)o).id;
+ }
+ }
+
+ @Override
+ public CheckpointSource.Checkpoint newCheckpoint() {
+ return new MyCheckpoint();
+ }
+
+ public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean compact)
+ throws IOException {
+ }
+ };
+ final byte[] KEY = "foobar".getBytes();
+ File log0 = new File(curDir, "0.log");
+ LedgerDirsManager dirs = new LedgerDirsManager(conf);
+ assertFalse("Log shouldnt exist", log0.exists());
+ InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, manager,
+ dirs, checkpointSource);
+ ledgers.add(1l);
+ ledgers.add(2l);
+ ledgers.add(3l);
+ storage.setMasterKey(1, KEY);
+ storage.setMasterKey(2, KEY);
+ storage.setMasterKey(3, KEY);
+ storage.addEntry(genEntry(1, 1, ENTRY_SIZE));
+ storage.addEntry(genEntry(2, 1, ENTRY_SIZE));
+ storage.addEntry(genEntry(2, 2, ENTRY_SIZE));
+ storage.addEntry(genEntry(3, 2, ENTRY_SIZE));
+ storage.flush();
+ storage.shutdown();
+
+ assertTrue("Log should exist", log0.exists());
+ ledgers.remove(2l);
+ ledgers.remove(3l);
+
+ storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+ storage.start();
+ for (int i = 0; i < 10; i++) {
+ if (!log0.exists()) {
+ break;
+ }
+ Thread.sleep(1000);
+ storage.entryLogger.flush(); // simulate sync thread
+ }
+ assertFalse("Log shouldnt exist", log0.exists());
+
+ ledgers.add(4l);
+ storage.setMasterKey(4, KEY);
+ storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush
+
+ storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+ storage.getEntry(1, 1); // entry should exist
+ }
+
+ private ByteBuffer genEntry(long ledger, long entry, int size) {
+ byte[] data = new byte[size];
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+ bb.putLong(ledger);
+ bb.putLong(entry);
+ while (bb.hasRemaining()) {
+ bb.put((byte)0xFF);
+ }
+ bb.flip();
+ return bb;
+ }
}