You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/04/18 17:30:24 UTC
[bookkeeper] branch master updated: Issue #570: Introducing EntryLogManager.
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a71da1d Issue #570: Introducing EntryLogManager.
a71da1d is described below
commit a71da1d1bae1790c913b773cdb14f61374687625
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Wed Apr 18 10:30:16 2018 -0700
Issue #570: Introducing EntryLogManager.
Descriptions of the changes in this PR:
Introducing EntryLogManager interface, which abstracts out current activeLogChannel,
rotatedLogChannels and corresponding lock for activeLogChannel. The current logic of
handling logs is moved to EntryLogManagerForSingleEntryLog class, in the
next sub-task EntryLogManagerForEntryLogPerLedger will be introduced. Also there
are minor changes to createNewLog logic and leastUnflushedLogId logic.
This is < sub-task5 > of Issue #570
Master Issue: #570
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Sijie Guo <si...@apache.org>
This closes #1281 from reddycharan/entrylogmanager, closes #570
---
.../apache/bookkeeper/bookie/BufferedChannel.java | 19 +-
.../org/apache/bookkeeper/bookie/EntryLogger.java | 916 ++++++++++++++-------
.../bookkeeper/bookie/ReadOnlyEntryLogger.java | 12 +-
.../bookkeeper/bookie/SortedLedgerStorage.java | 21 +-
.../bookkeeper/conf/ServerConfiguration.java | 4 +
.../apache/bookkeeper/bookie/CreateNewLogTest.java | 150 +++-
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 205 ++++-
.../bookie/LedgerStorageCheckpointTest.java | 34 +-
.../bookie/SortedLedgerStorageCheckpointTest.java | 11 +-
.../bookie/storage/ldb/DbLedgerStorageTest.java | 2 +-
.../apache/bookkeeper/test/ReadOnlyBookieTest.java | 2 +-
11 files changed, 1011 insertions(+), 365 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 05a20e5..53628cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -53,6 +53,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
* calling fileChannel.force
*/
protected final long unpersistedBytesBound;
+ private final boolean doRegularFlushes;
/*
* it tracks the number of bytes which are not persisted yet by force
@@ -81,6 +82,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
this.unpersistedBytes = new AtomicLong(0);
this.unpersistedBytesBound = unpersistedBytesBound;
+ this.doRegularFlushes = unpersistedBytesBound > 0;
}
@Override
@@ -114,7 +116,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
}
position.addAndGet(copied);
unpersistedBytes.addAndGet(copied);
- if (unpersistedBytesBound > 0) {
+ if (doRegularFlushes) {
if (unpersistedBytes.get() >= unpersistedBytesBound) {
flush();
shouldForceWrite = true;
@@ -157,6 +159,21 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
}
/**
+ * calls both flush and forceWrite methods if regular flush is enabled.
+ *
+ * @param forceMetadata
+ * - If true then this method is required to force changes to
+ * both the file's content and metadata to be written to storage;
+ * otherwise, it need only force content changes to be written
+ * @throws IOException
+ */
+ public void flushAndForceWriteIfRegularFlush(boolean forceMetadata) throws IOException {
+ if (doRegularFlushes) {
+ flushAndForceWrite(forceMetadata);
+ }
+ }
+
+ /**
* Write any data in the buffer to the file and advance the writeBufferPosition.
* Callers are expected to synchronize appropriately
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 99b0f49..158525d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -51,12 +51,16 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -84,6 +88,9 @@ import org.slf4j.LoggerFactory;
*/
public class EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
+ static final long UNASSIGNED_LEDGERID = -1L;
+ // log file suffix
+ private static final String LOG_FILE_SUFFIX = ".log";
@VisibleForTesting
static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -92,13 +99,10 @@ public class EntryLogger {
private final long logId;
private final EntryLogMetadata entryLogMetadata;
private final File logFile;
+ private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
- public BufferedLogChannel(FileChannel fc,
- int writeCapacity,
- int readCapacity,
- long logId,
- File logFile,
- long unpersistedBytesBound) throws IOException {
+ public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
+ long unpersistedBytesBound) throws IOException {
super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
this.logId = logId;
this.entryLogMetadata = new EntryLogMetadata(logId);
@@ -120,21 +124,104 @@ public class EntryLogger {
return entryLogMetadata.getLedgersMap();
}
+ public Long getLedgerIdAssigned() {
+ return ledgerIdAssigned;
+ }
+
+ public void setLedgerIdAssigned(Long ledgerId) {
+ this.ledgerIdAssigned = ledgerId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(BufferedChannel.class)
.add("logId", logId)
.add("logFile", logFile)
+ .add("ledgerIdAssigned", ledgerIdAssigned)
.toString();
}
+
+ /**
+ * Append the ledger map at the end of the entry log.
+ * Updates the entry log file header with the offset and size of the map.
+ */
+ private void appendLedgersMap() throws IOException {
+
+ long ledgerMapOffset = this.position();
+
+ ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+ int numberOfLedgers = (int) ledgersMap.size();
+
+ // Write the ledgers map into several batches
+
+ final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+ final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+ try {
+ ledgersMap.forEach(new BiConsumerLong() {
+ int remainingLedgers = numberOfLedgers;
+ boolean startNewBatch = true;
+ int remainingInBatch = 0;
+
+ @Override
+ public void accept(long ledgerId, long size) {
+ if (startNewBatch) {
+ int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
+ int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+ serializedMap.clear();
+ serializedMap.writeInt(ledgerMapSize - 4);
+ serializedMap.writeLong(INVALID_LID);
+ serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+ serializedMap.writeInt(batchSize);
+
+ startNewBatch = false;
+ remainingInBatch = batchSize;
+ }
+ // Dump the ledger in the current batch
+ serializedMap.writeLong(ledgerId);
+ serializedMap.writeLong(size);
+ --remainingLedgers;
+
+ if (--remainingInBatch == 0) {
+ // Close current batch
+ try {
+ write(serializedMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ startNewBatch = true;
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw e;
+ }
+ } finally {
+ serializedMap.release();
+ }
+ // Flush the ledger's map out before we write the header.
+ // Otherwise the header might point to something that is not fully
+ // written
+ super.flush();
+
+ // Update the headers with the map offset and count of ledgers
+ ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+ mapInfo.putLong(ledgerMapOffset);
+ mapInfo.putInt(numberOfLedgers);
+ mapInfo.flip();
+ this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+ }
}
- volatile File currentDir;
private final LedgerDirsManager ledgerDirsManager;
private final boolean entryLogPerLedgerEnabled;
- private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
- private volatile long leastUnflushedLogId;
+ final RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
/**
* locks for compaction log.
@@ -145,11 +232,11 @@ public class EntryLogger {
* The maximum size of a entry logger file.
*/
final long logSizeLimit;
- List<BufferedLogChannel> logChannelsToFlush;
- volatile BufferedLogChannel logChannel;
private volatile BufferedLogChannel compactionLogChannel;
- private final EntryLoggerAllocator entryLoggerAllocator;
+ final EntryLoggerAllocator entryLoggerAllocator;
+ private final EntryLogManager entryLogManager;
+
private final boolean entryLogPreAllocationEnabled;
private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>();
@@ -268,6 +355,8 @@ public class EntryLogger {
// but the protocol varies so an exact value is difficult to determine
this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
this.ledgerDirsManager = ledgerDirsManager;
+ this.conf = conf;
+ entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
if (listener != null) {
addListener(listener);
}
@@ -296,12 +385,72 @@ public class EntryLogger {
logId = lastLogId;
}
}
- this.leastUnflushedLogId = logId + 1;
+ this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
- this.conf = conf;
- this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
+ if (entryLogPerLedgerEnabled) {
+ this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+ @Override
+ public void checkpoint() throws IOException {
+ /*
+ * In the case of entryLogPerLedgerEnabled we need to flush
+ * both rotatedlogs and currentlogs. This is needed because
+ * syncThread periodically does checkpoint and at this time
+ * all the logs should be flushed.
+ *
+ */
+ super.flush();
+ }
+
+ @Override
+ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+ // do nothing
+ /*
+ * prepareSortedLedgerStorageCheckpoint is required for
+ * singleentrylog scenario, but it is not needed for
+ * entrylogperledger scenario, since entries of a ledger go
+ * to a entrylog (even during compaction) and SyncThread
+ * drives periodic checkpoint logic.
+ */
+
+ }
+
+ @Override
+ public void prepareEntryMemTableFlush() {
+ // do nothing
+ }
+
+ @Override
+ public boolean commitEntryMemTableFlush() throws IOException {
+ // lock it only if there is new data
+ // so that cache accesstime is not changed
+ Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+ Arrays.asList(super.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+ for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+ if (reachEntryLogLimit(currentLog, 0L)) {
+ synchronized (this) {
+ if (reachEntryLogLimit(currentLog, 0L)) {
+ LOG.info("Rolling entry logger since it reached size limitation");
+ createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+ }
+ }
+ }
+ }
+ /*
+ * in the case of entrylogperledger, SyncThread drives
+ * checkpoint logic for every flushInterval. So
+ * EntryMemtable doesn't need to call checkpoint in the case
+ * of entrylogperledger.
+ */
+ return false;
+ }
+ };
+ } else {
+ this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+ }
+ }
- initialize();
+ EntryLogManager getEntryLogManager() {
+ return entryLogManager;
}
void addListener(EntryLogListener listener) {
@@ -324,13 +473,11 @@ public class EntryLogger {
*/
private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
throws IOException {
- BufferedLogChannel bc = logChannel;
+ BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId);
if (null != bc) {
- if (entryLogId == bc.getLogId()) {
- synchronized (bc) {
- if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
- return bc.read(buff, pos);
- }
+ synchronized (bc) {
+ if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
+ return bc.read(buff, pos);
}
}
}
@@ -397,17 +544,12 @@ public class EntryLogger {
*
* @return least unflushed log id.
*/
- synchronized long getLeastUnflushedLogId() {
- return leastUnflushedLogId;
+ long getLeastUnflushedLogId() {
+ return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}
- synchronized long getCurrentLogId() {
- BufferedLogChannel channel = logChannel;
- if (null == channel) {
- return UNINITIALIZED_LOG_ID;
- } else {
- return channel.getLogId();
- }
+ long getPreviousAllocatedEntryLogId() {
+ return entryLoggerAllocator.getPreallocatedLogId();
}
/**
@@ -422,75 +564,16 @@ public class EntryLogger {
}
}
- protected void initialize() throws IOException {
- // Register listener for disk full notifications.
- ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-
- if (ledgerDirsManager.hasWritableLedgerDirs()) {
- createNewLog();
- }
+ void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+ entryLogManager.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
}
- private LedgerDirsListener getLedgerDirsListener() {
- return new LedgerDirsListener() {
- @Override
- public void diskFull(File disk) {
- // If the current entry log disk is full, then create new entry
- // log.
- if (currentDir != null && currentDir.equals(disk)) {
- shouldCreateNewEntryLog.set(true);
- }
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- // If the current entry log disk is almost full, then create new entry
- // log.
- if (currentDir != null && currentDir.equals(disk)) {
- shouldCreateNewEntryLog.set(true);
- }
- }
- };
+ void prepareEntryMemTableFlush() {
+ entryLogManager.prepareEntryMemTableFlush();
}
- /**
- * Rolling a new log file to write.
- */
- synchronized void rollLog() throws IOException {
- createNewLog();
- }
-
- /**
- * Creates a new log file.
- */
- void createNewLog() throws IOException {
- // first tried to create a new log channel. add current log channel to ToFlush list only when
- // there is a new log channel. it would prevent that a log channel is referenced by both
- // *logChannel* and *ToFlush* list.
- if (null != logChannel) {
- if (null == logChannelsToFlush) {
- logChannelsToFlush = new LinkedList<BufferedLogChannel>();
- }
-
- // flush the internal buffer back to filesystem but not sync disk
- // so the readers could access the data from filesystem.
- logChannel.flush();
-
- // Append ledgers map at the end of entry log
- appendLedgersMap(logChannel);
-
- BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
- logChannelsToFlush.add(logChannel);
- LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
- logChannel.getLogId(), logChannelsToFlush);
- for (EntryLogListener listener : listeners) {
- listener.onRotateEntryLog();
- }
- logChannel = newLogChannel;
- } else {
- logChannel = entryLoggerAllocator.createNewLog();
- }
- currentDir = logChannel.getLogFile().getParentFile();
+ boolean commitEntryMemTableFlush() throws IOException {
+ return entryLogManager.commitEntryMemTableFlush();
}
/**
@@ -499,79 +582,6 @@ public class EntryLogger {
EntryLoggerAllocator getEntryLoggerAllocator() {
return entryLoggerAllocator;
}
- /**
- * Append the ledger map at the end of the entry log.
- * Updates the entry log file header with the offset and size of the map.
- */
- private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException {
- long ledgerMapOffset = entryLogChannel.position();
-
- ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
- int numberOfLedgers = (int) ledgersMap.size();
-
- // Write the ledgers map into several batches
-
- final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
- final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
- try {
- ledgersMap.forEach(new BiConsumerLong() {
- int remainingLedgers = numberOfLedgers;
- boolean startNewBatch = true;
- int remainingInBatch = 0;
-
- @Override
- public void accept(long ledgerId, long size) {
- if (startNewBatch) {
- int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
- int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
- serializedMap.clear();
- serializedMap.writeInt(ledgerMapSize - 4);
- serializedMap.writeLong(INVALID_LID);
- serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
- serializedMap.writeInt(batchSize);
-
- startNewBatch = false;
- remainingInBatch = batchSize;
- }
- // Dump the ledger in the current batch
- serializedMap.writeLong(ledgerId);
- serializedMap.writeLong(size);
- --remainingLedgers;
-
- if (--remainingInBatch == 0) {
- // Close current batch
- try {
- entryLogChannel.write(serializedMap);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- startNewBatch = true;
- }
- }
- });
- } catch (RuntimeException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw e;
- }
- } finally {
- serializedMap.release();
- }
- // Flush the ledger's map out before we write the header.
- // Otherwise the header might point to something that is not fully written
- entryLogChannel.flush();
-
- // Update the headers with the map offset and count of ledgers
- ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
- mapInfo.putLong(ledgerMapOffset);
- mapInfo.putInt(numberOfLedgers);
- mapInfo.flip();
- entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
- }
/**
* An allocator pre-allocates entry log files.
@@ -589,6 +599,10 @@ public class EntryLogger {
allocatorExecutor = Executors.newSingleThreadExecutor();
}
+ synchronized long getPreallocatedLogId() {
+ return preallocatedLogId;
+ }
+
BufferedLogChannel createNewLog() throws IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
@@ -630,37 +644,42 @@ public class EntryLogger {
}
}
- private BufferedLogChannel allocateNewLog() throws IOException {
+ private synchronized BufferedLogChannel allocateNewLog() throws IOException {
return allocateNewLog(".log");
}
/**
* Allocate a new log file.
*/
- private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
+ private synchronized BufferedLogChannel allocateNewLog(String suffix) throws IOException {
List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
- Collections.shuffle(list);
+ File dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
+
+ List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+ String logFileName;
// It would better not to overwrite existing entry log files
- File newLogFile = null;
+ File testLogFile = null;
do {
if (preallocatedLogId >= Integer.MAX_VALUE) {
preallocatedLogId = 0;
} else {
++preallocatedLogId;
}
- String logFileName = Long.toHexString(preallocatedLogId) + suffix;
- for (File dir : list) {
- newLogFile = new File(dir, logFileName);
- if (newLogFile.exists()) {
- LOG.warn("Found existed entry log " + newLogFile
+ logFileName = Long.toHexString(preallocatedLogId) + suffix;
+ for (File dir : ledgersDirs) {
+ testLogFile = new File(dir, logFileName);
+ if (testLogFile.exists()) {
+ LOG.warn("Found existed entry log " + testLogFile
+ " when trying to create it as a new log.");
- newLogFile = null;
+ testLogFile = null;
break;
}
}
- } while (newLogFile == null);
+ } while (testLogFile == null);
+ File newLogFile = new File(dirForNextEntryLog, logFileName);
FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
logfileHeader.readerIndex(0);
@@ -669,6 +688,11 @@ public class EntryLogger {
for (File f : list) {
setLastLogId(f, preallocatedLogId);
}
+
+ if (suffix.equals(LOG_FILE_SUFFIX)) {
+ recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+ }
+
LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
return logChannel;
}
@@ -788,89 +812,418 @@ public class EntryLogger {
}
}
- /**
- * Flushes all rotated log channels. After log channels are flushed,
- * move leastUnflushedLogId ptr to current logId.
- */
- void checkpoint() throws IOException {
- flushRotatedLogs();
+ interface EntryLogManager {
+
+ /*
+ * add entry to the corresponding entrylog and return the position of
+ * the entry in the entrylog
+ */
+ long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;
+
+ /*
+ * gets the active logChannel with the given entryLogId. null if it is
+ * not existing.
+ */
+ BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
/*
- * In the case of entryLogPerLedgerEnabled we need to flush both
- * rotatedlogs and currentlogs. This is needed because syncThread
- * periodically does checkpoint and at this time all the logs should
- * be flushed.
+ * Returns eligible writable ledger dir for the creation next entrylog
+ */
+ File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+ /*
+ * Do the operations required for checkpoint.
+ */
+ void checkpoint() throws IOException;
+
+ /*
+ * flush both current and rotated logs.
+ */
+ void flush() throws IOException;
+
+ /*
+ * close current logs.
+ */
+ void close() throws IOException;
+
+ /*
+ * force close current logs.
+ */
+ void forceClose();
+
+ /*
+ *
+ */
+ void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;
+
+ /*
+ * this method should be called before doing entrymemtable flush, it
+ * would save the state of the entrylogger before entrymemtable flush
+ * and commitEntryMemTableFlush would take appropriate action after
+ * entrymemtable flush.
+ */
+ void prepareEntryMemTableFlush();
+
+ /*
+ * this method should be called after doing entrymemtable flush,it would
+ * take appropriate action after entrymemtable flush depending on the
+ * current state of the entrylogger and the state of the entrylogger
+ * during prepareEntryMemTableFlush.
*
- * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of
- * this Issue, I will move this logic to individual implamentations of
- * EntryLogManager and it would be free of this booalen flag based logic.
+ * It is assumed that there would be corresponding
+ * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+ * would be called from the same thread.
*
+ * returns boolean value indicating whether EntryMemTable should do checkpoint
+ * after this commit method.
*/
- if (entryLogPerLedgerEnabled) {
- flushCurrentLog();
+ boolean commitEntryMemTableFlush() throws IOException;
+ }
+
+ abstract class EntryLogManagerBase implements EntryLogManager {
+ volatile List<BufferedLogChannel> rotatedLogChannels;
+
+ private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new FastThreadLocal<ByteBuf>() {
+ @Override
+ protected ByteBuf initialValue() throws Exception {
+ return Unpooled.buffer(4);
+ }
+ };
+
+ /*
+ * This method should be guarded by a lock, so callers of this method
+ * should be in the right scope of the lock.
+ */
+ @Override
+ public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+ int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
+ BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+ ByteBuf sizeBuffer = sizeBufferForAdd.get();
+ sizeBuffer.clear();
+ sizeBuffer.writeInt(entry.readableBytes());
+ logChannel.write(sizeBuffer);
+
+ long pos = logChannel.position();
+ logChannel.write(entry);
+ logChannel.registerWrittenEntry(ledger, entrySize);
+
+ return (logChannel.getLogId() << 32L) | pos;
+ }
+
+ boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+ if (logChannel == null) {
+ return false;
+ }
+ return logChannel.position() + size > logSizeLimit;
+ }
+
+ boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+ if (logChannel == null) {
+ return false;
+ }
+ return logChannel.position() + size > Integer.MAX_VALUE;
+ }
+
+ abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+ abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
+ throws IOException;
+
+ abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
+
+ /*
+ * flush current logs.
+ */
+ abstract void flushCurrentLogs() throws IOException;
+
+ /*
+ * flush rotated logs.
+ */
+ abstract void flushRotatedLogs() throws IOException;
+
+ List<BufferedLogChannel> getRotatedLogChannels() {
+ return rotatedLogChannels;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushRotatedLogs();
+ flushCurrentLogs();
+ }
+
+ void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
+ if (logChannel != null) {
+ logChannel.flushAndForceWrite(forceMetadata);
+ LOG.debug("Flush and sync current entry logger {}", logChannel.getLogId());
+ }
+ }
+
+ /*
+ * Creates a new log file. This method should be guarded by a lock,
+ * so callers of this method should be in right scope of the lock.
+ */
+ void createNewLog(long ledgerId) throws IOException {
+ BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+ // first tried to create a new log channel. add current log channel to ToFlush list only when
+ // there is a new log channel. it would prevent that a log channel is referenced by both
+ // *logChannel* and *ToFlush* list.
+ if (null != logChannel) {
+
+ // flush the internal buffer back to filesystem but not sync disk
+ logChannel.flush();
+
+ // Append ledgers map at the end of entry log
+ logChannel.appendLedgersMap();
+
+ BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
+ setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+ LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+ logChannel.getLogId(), rotatedLogChannels);
+ for (EntryLogListener listener : listeners) {
+ listener.onRotateEntryLog();
+ }
+ } else {
+ setCurrentLogForLedgerAndAddToRotate(ledgerId, entryLoggerAllocator.createNewLog());
+ }
}
}
- void flushRotatedLogs() throws IOException {
- List<BufferedLogChannel> channels = null;
- long flushedLogId = INVALID_LID;
- synchronized (this) {
- channels = logChannelsToFlush;
- logChannelsToFlush = null;
+ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+ private volatile BufferedLogChannel activeLogChannel;
+ private long logIdBeforeFlush = INVALID_LID;
+ private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+
+ EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
+ this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+ // Register listener for disk full notifications.
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
}
- if (null == channels) {
- return;
+
+ private LedgerDirsListener getLedgerDirsListener() {
+ return new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ // If the current entry log disk is full, then create new
+ // entry log.
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null
+ && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+ shouldCreateNewEntryLog.set(true);
+ }
+ }
+
+ @Override
+ public void diskAlmostFull(File disk) {
+ // If the current entry log disk is almost full, then create new entry
+ // log.
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null
+ && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+ shouldCreateNewEntryLog.set(true);
+ }
+ }
+ };
}
- Iterator<BufferedLogChannel> chIter = channels.iterator();
- while (chIter.hasNext()) {
- BufferedLogChannel channel = chIter.next();
- try {
- channel.flushAndForceWrite(false);
- } catch (IOException ioe) {
- // rescue from flush exception, add unflushed channels back
- synchronized (this) {
- if (null == logChannelsToFlush) {
- logChannelsToFlush = channels;
- } else {
- logChannelsToFlush.addAll(0, channels);
+
+ @Override
+ public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+ return super.addEntry(ledger, entry, rollLog);
+ }
+
+ @Override
+ synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
+ boolean rollLog) throws IOException {
+ if (null == activeLogChannel) {
+ // log channel can be null because the file is deferred to be created
+ createNewLog(UNASSIGNED_LEDGERID);
+ }
+
+ boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
+ : readEntryLogHardLimit(activeLogChannel, entrySize);
+ // Create new log if logSizeLimit reached or current disk is full
+ boolean createNewLog = shouldCreateNewEntryLog.get();
+ if (createNewLog || reachEntryLogLimit) {
+ if (activeLogChannel != null) {
+ activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+ }
+ createNewLog(UNASSIGNED_LEDGERID);
+ // Reset the flag
+ if (createNewLog) {
+ shouldCreateNewEntryLog.set(false);
+ }
+ }
+ return activeLogChannel;
+ }
+
+ @Override
+ synchronized void createNewLog(long ledgerId) throws IOException {
+ super.createNewLog(ledgerId);
+ }
+
+ @Override
+ public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) {
+ BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+ activeLogChannel = logChannel;
+ if (hasToRotateLogChannel != null) {
+ rotatedLogChannels.add(hasToRotateLogChannel);
+ }
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+ return activeLogChannel;
+ }
+
+ @Override
+ public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+ BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+ if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
+ return activeLogChannelTemp;
+ }
+ return null;
+ }
+
+ @Override
+ public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+ Collections.shuffle(writableLedgerDirs);
+ return writableLedgerDirs.get(0);
+ }
+
+ @Override
+ public void checkpoint() throws IOException {
+ flushRotatedLogs();
+ }
+
+ public long getCurrentLogId() {
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null) {
+ return currentActiveLogChannel.getLogId();
+ } else {
+ return EntryLogger.UNINITIALIZED_LOG_ID;
+ }
+ }
+
+ @Override
+ public void flushCurrentLogs() throws IOException {
+ BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+ if (currentActiveLogChannel != null) {
+ /**
+ * flushCurrentLogs method is called during checkpoint, so
+ * metadata of the file also should be force written.
+ */
+ flushLogChannel(currentActiveLogChannel, true);
+ }
+ }
+
+ @Override
+ void flushRotatedLogs() throws IOException {
+ List<BufferedLogChannel> channels = null;
+ synchronized (this) {
+ channels = rotatedLogChannels;
+ rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+ }
+ if (null == channels) {
+ return;
+ }
+ Iterator<BufferedLogChannel> chIter = channels.iterator();
+ while (chIter.hasNext()) {
+ BufferedLogChannel channel = chIter.next();
+ try {
+ channel.flushAndForceWrite(true);
+ } catch (IOException ioe) {
+ // rescue from flush exception, add unflushed channels back
+ synchronized (this) {
+ if (null == rotatedLogChannels) {
+ rotatedLogChannels = channels;
+ } else {
+ rotatedLogChannels.addAll(0, channels);
+ }
}
+ throw ioe;
}
- throw ioe;
+ // remove the channel from the list after it is successfully flushed
+ chIter.remove();
+ // since this channel is only used for writing, after flushing the channel,
+ // we had to close the underlying file channel. Otherwise, we might end up
+ // leaking fds which cause the disk spaces could not be reclaimed.
+ closeFileChannel(channel);
+ recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+ LOG.info("Synced entry logger {} to disk.", channel.getLogId());
}
- // remove the channel from the list after it is successfully flushed
- chIter.remove();
- // since this channel is only used for writing, after flushing the channel,
- // we had to close the underlying file channel. Otherwise, we might end up
- // leaking fds which cause the disk spaces could not be reclaimed.
- closeFileChannel(channel);
- if (channel.getLogId() > flushedLogId) {
- flushedLogId = channel.getLogId();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (activeLogChannel != null) {
+ closeFileChannel(activeLogChannel);
}
- LOG.info("Synced entry logger {} to disk.", channel.getLogId());
}
- // move the leastUnflushedLogId ptr
- leastUnflushedLogId = flushedLogId + 1;
- }
- public void flush() throws IOException {
- flushRotatedLogs();
- flushCurrentLog();
- }
+ @Override
+ public void forceClose() {
+ if (activeLogChannel != null) {
+ forceCloseFileChannel(activeLogChannel);
+ }
+ }
+
+ @Override
+ public void prepareEntryMemTableFlush() {
+ logIdBeforeFlush = getCurrentLogId();
+ }
- synchronized void flushCurrentLog() throws IOException {
- if (logChannel != null) {
- logChannel.flushAndForceWrite(false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
+ @Override
+ public boolean commitEntryMemTableFlush() throws IOException {
+ long logIdAfterFlush = getCurrentLogId();
+ /*
+ * in any case that an entry log reaches the limit, we roll the log
+ * and start checkpointing. if a memory table is flushed spanning
+ * over two entry log files, we also roll log. this is for
+ * performance consideration: since we don't wanna checkpoint a new
+ * log file that ledger storage is writing to.
+ */
+ if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
+ LOG.info("Rolling entry logger since it reached size limitation");
+ createNewLog(UNASSIGNED_LEDGERID);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
+ if (numBytesFlushed > 0) {
+ // if bytes are added between previous flush and this checkpoint,
+ // it means bytes might live at current active entry log, we need
+ // roll current entry log and then issue checkpoint to underlying
+ // interleaved ledger storage.
+ createNewLog(UNASSIGNED_LEDGERID);
}
}
}
+ /**
+ * Flushes all rotated log channels. After log channels are flushed,
+ * move leastUnflushedLogId ptr to current logId.
+ */
+ void checkpoint() throws IOException {
+ entryLogManager.checkpoint();
+ }
+
+ public void flush() throws IOException {
+ entryLogManager.flush();
+ }
+
long addEntry(long ledger, ByteBuffer entry) throws IOException {
- return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+ return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
}
long addEntry(long ledger, ByteBuf entry) throws IOException {
- return addEntry(ledger, entry, true);
+ return entryLogManager.addEntry(ledger, entry, true);
+ }
+
+ public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+ return entryLogManager.addEntry(ledger, entry, true);
}
private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
@@ -880,39 +1233,6 @@ public class EntryLogger {
}
};
- public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
- if (null == logChannel) {
- // log channel can be null because the file is deferred to be created when no writable ledger directory
- // is available.
- createNewLog();
- }
-
- int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
- boolean reachEntryLogLimit =
- rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
- // Create new log if logSizeLimit reached or current disk is full
- boolean createNewLog = shouldCreateNewEntryLog.get();
- if (createNewLog || reachEntryLogLimit) {
- createNewLog();
- // Reset the flag
- if (createNewLog) {
- shouldCreateNewEntryLog.set(false);
- }
- }
-
- // Get a buffer from thread local to store the size
- ByteBuf sizeBuffer = this.sizeBuffer.get();
- sizeBuffer.clear();
- sizeBuffer.writeInt(entry.readableBytes());
- logChannel.write(sizeBuffer);
-
- long pos = logChannel.position();
- logChannel.write(entry);
- logChannel.registerWrittenEntry(ledger, entrySize);
-
- return (logChannel.getLogId() << 32L) | pos;
- }
-
long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
synchronized (compactionLogLock) {
int entrySize = entry.readableBytes() + 4;
@@ -935,7 +1255,7 @@ public class EntryLogger {
void flushCompactionLog() throws IOException {
synchronized (compactionLogLock) {
if (compactionLogChannel != null) {
- appendLedgersMap(compactionLogChannel);
+ compactionLogChannel.appendLedgersMap();
compactionLogChannel.flushAndForceWrite(false);
LOG.info("Flushed compaction log file {} with logId.",
compactionLogChannel.getLogFile(),
@@ -982,13 +1302,7 @@ public class EntryLogger {
return offset >> 32L;
}
- synchronized boolean reachEntryLogLimit(long size) {
- return logChannel.position() + size > logSizeLimit;
- }
- synchronized boolean readEntryLogHardLimit(long size) {
- return logChannel.position() + size > Integer.MAX_VALUE;
- }
public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
throws IOException, Bookie.NoEntryException {
@@ -1366,8 +1680,7 @@ public class EntryLogger {
}
// clear the mapping, so we don't need to go through the channels again in finally block in normal case.
logid2FileChannel.clear();
- // close current writing log file
- closeFileChannel(logChannel);
+ entryLogManager.close();
synchronized (compactionLogLock) {
closeFileChannel(compactionLogChannel);
compactionLogChannel = null;
@@ -1379,7 +1692,8 @@ public class EntryLogger {
for (FileChannel fc : logid2FileChannel.values()) {
IOUtils.close(LOG, fc);
}
- forceCloseFileChannel(logChannel);
+
+ entryLogManager.forceClose();
synchronized (compactionLogLock) {
forceCloseFileChannel(compactionLogChannel);
}
@@ -1434,4 +1748,40 @@ public class EntryLogger {
static String logId2HexString(long logId) {
return Long.toHexString(logId);
}
-}
+
+ /**
+ * Datastructure which maintains the status of logchannels. When a
+ * logChannel is created entry of < entryLogId, false > will be made to this
+ * sortedmap and when logChannel is rotated and flushed then the entry is
+ * updated to < entryLogId, true > and all the lowest entries with
+ * < entryLogId, true > status will be removed from the sortedmap. So that way
+ * we could get least unflushed LogId.
+ *
+ */
+ static class RecentEntryLogsStatus {
+ private final SortedMap<Long, Boolean> entryLogsStatusMap;
+ private long leastUnflushedLogId;
+
+ RecentEntryLogsStatus(long leastUnflushedLogId) {
+ entryLogsStatusMap = new TreeMap<Long, Boolean>();
+ this.leastUnflushedLogId = leastUnflushedLogId;
+ }
+
+ synchronized void createdEntryLog(Long entryLogId) {
+ entryLogsStatusMap.put(entryLogId, false);
+ }
+
+ synchronized void flushRotatedEntryLog(Long entryLogId) {
+ entryLogsStatusMap.replace(entryLogId, true);
+ while ((!entryLogsStatusMap.isEmpty()) && (entryLogsStatusMap.get(entryLogsStatusMap.firstKey()))) {
+ long leastFlushedLogId = entryLogsStatusMap.firstKey();
+ entryLogsStatusMap.remove(leastFlushedLogId);
+ leastUnflushedLogId = leastFlushedLogId + 1;
+ }
+ }
+
+ synchronized long getLeastUnflushedLogId() {
+ return leastUnflushedLogId;
+ }
+ }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index d661412..3a07ec4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -38,23 +38,13 @@ public class ReadOnlyEntryLogger extends EntryLogger {
}
@Override
- protected void initialize() throws IOException {
- // do nothing for read only entry logger
- }
-
- @Override
- void createNewLog() throws IOException {
- throw new IOException("Can't create new entry log using a readonly entry logger.");
- }
-
- @Override
protected boolean removeEntryLog(long entryLogId) {
// can't remove entry log in readonly mode
return false;
}
@Override
- public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
throw new IOException("Can't add entry to a readonly entry logger.");
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 0e3e3b9..f2efa55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -168,13 +168,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
@Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
long numBytesFlushed = memTable.flush(this, checkpoint);
- if (numBytesFlushed > 0) {
- // if bytes are added between previous flush and this checkpoint,
- // it means bytes might live at current active entry log, we need
- // roll current entry log and then issue checkpoint to underlying
- // interleaved ledger storage.
- entryLogger.rollLog();
- }
+ entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
super.checkpoint(checkpoint);
}
@@ -209,19 +203,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
public void run() {
try {
LOG.info("Started flushing mem table.");
- long logIdBeforeFlush = entryLogger.getCurrentLogId();
+ entryLogger.prepareEntryMemTableFlush();
memTable.flush(SortedLedgerStorage.this);
- long logIdAfterFlush = entryLogger.getCurrentLogId();
- // in any case that an entry log reaches the limit, we roll the log and start checkpointing.
- // if a memory table is flushed spanning over two entry log files, we also roll log. this is
- // for performance consideration: since we don't wanna checkpoint a new log file that ledger
- // storage is writing to.
- if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) {
- LOG.info("Rolling entry logger since it reached size limitation");
- entryLogger.rollLog();
+ if (entryLogger.commitEntryMemTableFlush()) {
checkpointer.startCheckpoint(cp);
}
- } catch (IOException e) {
+ } catch (Exception e) {
stateManager.transitionToReadOnlyMode();
LOG.error("Exception thrown while flushing skip list cache.", e);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 904ca3f..785b35a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -2262,6 +2262,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
if (0 == getBookiePort() && !getAllowEphemeralPorts()) {
throw new ConfigurationException("Invalid port specified, using ephemeral ports accidentally?");
}
+ if (isEntryLogPerLedgerEnabled() && getUseTransactionalCompaction()) {
+ throw new ConfigurationException(
+ "When entryLogPerLedger is enabled , it is unnecessary to use transactional compaction");
+ }
}
/**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 33642c2..4257ccc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -21,12 +21,18 @@ package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.DiskChecker;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -93,16 +99,17 @@ public class CreateNewLogTest {
// Extracted from createNewLog()
String logFileName = Long.toHexString(1) + ".log";
File dir = ledgerDirsManager.pickRandomWritableDir();
- LOG.info("Picked this directory: " + dir);
+ LOG.info("Picked this directory: {}", dir);
File newLogFile = new File(dir, logFileName);
newLogFile.createNewFile();
EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
// Calls createNewLog, and with the number of directories we
// are using, if it picks one at random it will fail.
- el.createNewLog();
- LOG.info("This is the current log id: " + el.getCurrentLogId());
- assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+ EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+ entryLogManager.createNewLog(0L);
+ LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+ assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
}
@Test
@@ -118,7 +125,7 @@ public class CreateNewLogTest {
// Extracted from createNewLog()
String logFileName = Long.toHexString(1) + ".log";
File dir = ledgerDirsManager.pickRandomWritableDir();
- LOG.info("Picked this directory: " + dir);
+ LOG.info("Picked this directory: {}", dir);
File newLogFile = new File(dir, logFileName);
newLogFile.createNewFile();
@@ -131,9 +138,136 @@ public class CreateNewLogTest {
EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
// Calls createNewLog, and with the number of directories we
// are using, if it picks one at random it will fail.
- el.createNewLog();
- LOG.info("This is the current log id: " + el.getCurrentLogId());
- assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+ EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+ entryLogManager.createNewLog(0L);
+ LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+ assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
}
+ @Test
+ public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationEnabled() throws Exception {
+ testConcurrentCreateNewLog(true);
+ }
+
+ @Test
+ public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationDisabled() throws Exception {
+ testConcurrentCreateNewLog(false);
+ }
+
+ public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled) throws Exception {
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+ // Creating a new configuration with a number of
+ // ledger directories.
+ conf.setLedgerDirNames(ledgerDirs);
+ conf.setEntryLogFilePreAllocationEnabled(entryLogFilePreAllocationEnabled);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+ EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+ EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager();
+ Assert.assertEquals("previousAllocatedEntryLogId after initialization", -1,
+ el.getPreviousAllocatedEntryLogId());
+ Assert.assertEquals("leastUnflushedLogId after initialization", 0, el.getLeastUnflushedLogId());
+ int createNewLogNumOfTimes = 10;
+ AtomicBoolean receivedException = new AtomicBoolean(false);
+
+ IntStream.range(0, createNewLogNumOfTimes).parallel().forEach((i) -> {
+ try {
+ (entryLogManager).createNewLog((long) i);
+ } catch (IOException e) {
+ LOG.error("Received exception while creating newLog", e);
+ receivedException.set(true);
+ }
+ });
+ // wait for the pre-allocation to complete
+ Thread.sleep(1000);
+
+ Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+ int expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes - 1;
+ if (entryLogFilePreAllocationEnabled) {
+ expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes;
+ }
+
+ Assert.assertEquals(
+ "previousAllocatedEntryLogId after " + createNewLogNumOfTimes
+ + " number of times createNewLog is called",
+ expectedPreviousAllocatedEntryLogId, el.getPreviousAllocatedEntryLogId());
+ Assert.assertEquals("Number of RotatedLogChannels", createNewLogNumOfTimes - 1,
+ entryLogManager.getRotatedLogChannels().size());
+ }
+
+ @Test
+ public void testCreateNewLogWithGaps() throws Exception {
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+ // Creating a new configuration with a number of
+ // ledger directories.
+ conf.setLedgerDirNames(ledgerDirs);
+ conf.setEntryLogFilePreAllocationEnabled(false);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+ EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+ EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) el.getEntryLogManager();
+ entryLogManagerBase.createNewLog(0L);
+
+ Assert.assertEquals("previousAllocatedEntryLogId after initialization", 0, el.getPreviousAllocatedEntryLogId());
+
+ // Extracted from createNewLog()
+ String logFileName = Long.toHexString(1) + ".log";
+ File dir = ledgerDirsManager.pickRandomWritableDir();
+ LOG.info("Picked this directory: {}", dir);
+ File newLogFile = new File(dir, logFileName);
+ newLogFile.createNewFile();
+
+ entryLogManagerBase.createNewLog(0L);
+ Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 1 is already taken", 2,
+ el.getPreviousAllocatedEntryLogId());
+
+ // Extracted from createNewLog()
+ logFileName = Long.toHexString(3) + ".log";
+ dir = ledgerDirsManager.pickRandomWritableDir();
+ LOG.info("Picked this directory: {}", dir);
+ newLogFile = new File(dir, logFileName);
+ newLogFile.createNewFile();
+
+ entryLogManagerBase.createNewLog(0L);
+ Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 3 is already taken", 4,
+ el.getPreviousAllocatedEntryLogId());
+ }
+
+ @Test
+ public void testCreateNewLogAndCompactionLog() throws Exception {
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+ // Creating a new configuration with a number of
+ // ledger directories.
+ conf.setLedgerDirNames(ledgerDirs);
+ conf.setEntryLogFilePreAllocationEnabled(true);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+ EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+ AtomicBoolean receivedException = new AtomicBoolean(false);
+
+ IntStream.range(0, 2).parallel().forEach((i) -> {
+ try {
+ if (i % 2 == 0) {
+ ((EntryLogManagerBase) el.getEntryLogManager()).createNewLog((long) i);
+ } else {
+ el.createNewCompactionLog();
+ }
+ } catch (IOException e) {
+ LOG.error("Received exception while creating newLog", e);
+ receivedException.set(true);
+ }
+ });
+ // wait for the pre-allocation to complete
+ Thread.sleep(1000);
+
+ Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+ Assert.assertEquals(
+ "previousAllocatedEntryLogId after 2 times createNewLog is called", 2,
+ el.getPreviousAllocatedEntryLogId());
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index b7f286c..21da951 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -44,6 +44,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -64,6 +67,7 @@ public class EntryLogTest {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
final List<File> tempDirs = new ArrayList<File>();
+ final Random rand = new Random();
File createTempDir(String prefix, String suffix) throws IOException {
File dir = IOUtils.createTempDir(prefix, suffix);
@@ -119,11 +123,13 @@ public class EntryLogTest {
this.dirsMgr.addToFilledDirs(curDir);
entryLogger = new EntryLogger(conf, dirsMgr);
- assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+ EntryLogManagerForSingleEntryLog entryLogManager =
+ (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+ assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
// add the first entry will trigger file creation
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
- assertEquals(2L, entryLogger.getCurrentLogId());
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+ assertEquals(0L, entryLogManager.getCurrentLogId());
}
@Test
@@ -141,23 +147,25 @@ public class EntryLogTest {
this.dirsMgr.addToFilledDirs(curDir);
entryLogger = new EntryLogger(conf, dirsMgr);
- assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+ EntryLogManagerForSingleEntryLog entryLogManager =
+ (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+ assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
// add the first entry will trigger file creation
try {
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
fail("Should fail to append entry if there is no enough reserved space left");
} catch (NoWritableLedgerDirException e) {
- assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+ assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
}
}
@Test
public void testCorruptEntryLog() throws Exception {
// create some entries
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
- entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
- entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+ entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
entryLogger.flush();
entryLogger.shutdown();
// now lets truncate the file to corrupt the last entry, which simulates a partial write
@@ -184,6 +192,16 @@ public class EntryLogTest {
return bb;
}
+ private ByteBuf generateEntry(long ledger, long entry, int length) {
+ ByteBuf bb = Unpooled.buffer(length);
+ bb.writeLong(ledger);
+ bb.writeLong(entry);
+ byte[] randbyteArray = new byte[length - 8 - 8];
+ rand.nextBytes(randbyteArray);
+ bb.writeBytes(randbyteArray);
+ return bb;
+ }
+
private static String generateDataString(long ledger, long entry) {
return ("ledger-" + ledger + "-" + entry);
}
@@ -199,7 +217,7 @@ public class EntryLogTest {
EntryLogger logger = new EntryLogger(conf, dirsMgr);
for (int j = 0; j < numEntries; j++) {
- positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+ positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
}
logger.flush();
logger.shutdown();
@@ -214,7 +232,7 @@ public class EntryLogTest {
EntryLogger logger = new EntryLogger(conf, dirsMgr);
for (int j = 0; j < numEntries; j++) {
- positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+ positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
}
logger.flush();
logger.shutdown();
@@ -271,6 +289,7 @@ public class EntryLogTest {
File ledgerDir1 = createTempDir("bkTest", ".dir");
File ledgerDir2 = createTempDir("bkTest", ".dir");
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
conf.setJournalDirName(ledgerDir1.toString());
conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
ledgerDir2.getAbsolutePath() });
@@ -287,7 +306,8 @@ public class EntryLogTest {
ledgerStorage.addEntry(generateEntry(1, 1));
ledgerStorage.addEntry(generateEntry(2, 1));
// Add entry with disk full failure simulation
- bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+ bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager())
+ .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
ledgerStorage.addEntry(generateEntry(3, 1));
// Verify written entries
Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
@@ -301,12 +321,14 @@ public class EntryLogTest {
@Test
public void testRecoverFromLedgersMap() throws Exception {
// create some entries
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
- entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
- entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
- entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
- entryLogger.rollLog();
- entryLogger.flushRotatedLogs();
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+ entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+ entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+
+ EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+ entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+ entryLogManager.flushRotatedLogs();
EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L);
LOG.info("Extracted Meta From Entry Log {}", meta);
@@ -324,11 +346,11 @@ public class EntryLogTest {
@Test
public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
// create some entries
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
- entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
- entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
- entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
- entryLogger.rollLog();
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+ entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+ entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+ ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
entryLogger.shutdown();
// Rewrite the entry log header to be on V0 format
@@ -373,9 +395,10 @@ public class EntryLogTest {
entryLogger = new EntryLogger(conf, dirsMgr);
// create a logger whose initialization phase allocating a new entry log
+ ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
- entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
// the Future<BufferedLogChannel> is not null all the time
assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
entryLogger.shutdown();
@@ -386,7 +409,7 @@ public class EntryLogTest {
entryLogger = new EntryLogger(conf, dirsMgr);
assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
- entryLogger.addEntry(2, generateEntry(1, 1).nioBuffer());
+ entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer());
// the Future<BufferedLogChannel> is null all the time
assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
@@ -398,17 +421,18 @@ public class EntryLogTest {
@Test
public void testGetEntryLogsSet() throws Exception {
// create some entries
- assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
+ EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+ assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet());
- entryLogger.rollLog();
- entryLogger.flushRotatedLogs();
+ entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+ entryLogManagerBase.flushRotatedLogs();
- assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
+ assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
- entryLogger.rollLog();
- entryLogger.flushRotatedLogs();
+ entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+ entryLogManagerBase.flushRotatedLogs();
- assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), entryLogger.getEntryLogsSet());
+ assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
}
static class LedgerStorageWriteTask implements Callable<Boolean> {
@@ -566,4 +590,121 @@ public class EntryLogTest {
executor.shutdownNow();
}
+
+ /**
+ * Test to verify the leastUnflushedLogId logic in EntryLogsStatus.
+ */
+ @Test
+ public void testEntryLoggersRecentEntryLogsStatus() throws Exception {
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
+
+ recentlyCreatedLogsStatus.createdEntryLog(0L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(0L);
+ // since we marked entrylog - 0 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.createdEntryLog(1L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.createdEntryLog(2L);
+ recentlyCreatedLogsStatus.createdEntryLog(3L);
+ recentlyCreatedLogsStatus.createdEntryLog(4L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(1L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(3L);
+ // here though we rotated entrylog-3, entrylog-2 is not yet rotated so
+ // LeastUnflushedLogId should be still 2
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(2L);
+ // entrylog-3 is already rotated, so leastUnflushedLogId should be 4
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 4L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(4L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.createdEntryLog(5L);
+ recentlyCreatedLogsStatus.createdEntryLog(7L);
+ recentlyCreatedLogsStatus.createdEntryLog(9L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(5L);
+ // since we marked entrylog-5 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 6L, entryLogger.getLeastUnflushedLogId());
+ recentlyCreatedLogsStatus.flushRotatedEntryLog(7L);
+ Assert.assertEquals("entryLogger's leastUnflushedLogId ", 8L, entryLogger.getLeastUnflushedLogId());
+ }
+
+ String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+ File ledgerDir;
+ File curDir;
+ String[] ledgerDirsPath = new String[numOfLedgerDirs];
+ for (int i = 0; i < numOfLedgerDirs; i++) {
+ ledgerDir = createTempDir("bkTest", ".dir");
+ curDir = Bookie.getCurrentDirectory(ledgerDir);
+ Bookie.checkDirectoryStructure(curDir);
+ ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+ }
+ return ledgerDirsPath;
+ }
+
+ /*
+ * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than
+ * flushIntervalInBytes
+ */
+ @Test(timeout = 60000)
+ public void testFlushIntervalInBytes() throws Exception {
+ long flushIntervalInBytes = 5000;
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setEntryLogPerLedgerEnabled(true);
+ conf.setFlushIntervalInBytes(flushIntervalInBytes);
+ conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+
+ /*
+ * when entryLogger is created Header of length EntryLogger.LOGFILE_HEADER_SIZE is created
+ */
+ long ledgerId = 0L;
+ int firstEntrySize = 1000;
+ long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize));
+ // entrylogger writes length of the entry (4 bytes) before writing entry
+ long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
+ Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes,
+ entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+ /*
+ * 'flushIntervalInBytes' number of bytes are flushed so BufferedChannel should be forcewritten
+ */
+ int secondEntrySize = (int) (flushIntervalInBytes - expectedUnpersistedBytes);
+ long entry1Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 1L, secondEntrySize));
+ Assert.assertEquals("Unpersisted Bytes of entrylog", 0,
+ entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+ /*
+ * since entrylog/Bufferedchannel is persisted (forcewritten), we should be able to read the entrylog using
+ * newEntryLogger
+ */
+ conf.setEntryLogPerLedgerEnabled(false);
+ EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
+ EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
+ Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForSingleEntryLog.class,
+ newEntryLogManager.getClass());
+
+ ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
+ long readLedgerId = buf.readLong();
+ long readEntryId = buf.readLong();
+ Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+ Assert.assertEquals("EntryId", 0L, readEntryId);
+
+ buf = newEntryLogger.readEntry(ledgerId, 1L, entry1Position);
+ readLedgerId = buf.readLong();
+ readEntryId = buf.readLong();
+ Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+ Assert.assertEquals("EntryId", 1L, readEntryId);
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index f55249c..6cbdcde 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -28,10 +28,13 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Enumeration;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -39,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -359,7 +364,7 @@ public class LedgerStorageCheckpointTest {
}
handle.close();
// simulate rolling entrylog
- ledgerStorage.entryLogger.rollLog();
+ ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId);
// sleep for a bit for checkpoint to do its task
executorController.advance(Duration.ofMillis(500));
@@ -490,6 +495,7 @@ public class LedgerStorageCheckpointTest {
BookKeeper bkClient = new BookKeeper(clientConf);
InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
EntryLogger entryLogger = ledgerStorage.entryLogger;
+ EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) entryLogger.getEntryLogManager();
int numOfEntries = 5;
byte[] dataBytes = "data".getBytes();
@@ -501,7 +507,7 @@ public class LedgerStorageCheckpointTest {
}
handle.close();
// simulate rolling entrylog
- ledgerStorage.entryLogger.rollLog();
+ entryLogManagerBase.createNewLog(ledgerId);
ledgerId = 20;
handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -510,7 +516,7 @@ public class LedgerStorageCheckpointTest {
}
handle.close();
// simulate rolling entrylog
- ledgerStorage.entryLogger.rollLog();
+ entryLogManagerBase.createNewLog(ledgerId);
ledgerId = 30;
handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -519,9 +525,14 @@ public class LedgerStorageCheckpointTest {
}
handle.close();
- Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
- entryLogger.logChannel.getUnpersistedBytes());
- Assert.assertNotEquals("There should be logChannelsToFlush", 0, entryLogger.logChannelsToFlush.size());
+ Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+ Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+ for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+ Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
+ currentLog.getUnpersistedBytes());
+ }
+ Assert.assertNotEquals("There should be logChannelsToFlush", 0,
+ entryLogManagerBase.getRotatedLogChannels().size());
/*
* wait for atleast flushInterval period, so that checkpoint can happen.
@@ -532,11 +543,16 @@ public class LedgerStorageCheckpointTest {
* since checkpoint happenend, there shouldn't be any logChannelsToFlush
* and bytesWrittenSinceLastFlush should be zero.
*/
+ List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManagerBase.getRotatedLogChannels();
Assert.assertTrue("There shouldn't be logChannelsToFlush",
- ((entryLogger.logChannelsToFlush == null) || (entryLogger.logChannelsToFlush.size() == 0)));
+ ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
- Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
- entryLogger.logChannel.getUnpersistedBytes());
+ copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+ Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+ for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+ Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
+ currentLog.getUnpersistedBytes());
+ }
}
static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index c183fbf..9642c18 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,9 +36,11 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -99,6 +101,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
public SortedLedgerStorageCheckpointTest() {
super();
conf.setEntryLogSizeLimit(1);
+ conf.setEntryLogFilePreAllocationEnabled(false);
this.checkpoints = new LinkedBlockingQueue<>();
}
@@ -182,6 +185,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
assertEquals(new TestCheckpoint(0), memtableCp);
// trigger a memtable flush
+ Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
// wait for checkpoint to complete
checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -219,9 +223,11 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
});
// simulate entry log is rotated (due to compaction)
- storage.entryLogger.rollLog();
+ EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
+ .getEntryLogManager();
+ entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
- long currentLogId = storage.entryLogger.getCurrentLogId();
+ long currentLogId = entryLogManager.getCurrentLogId();
log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
readyLatch.countDown();
@@ -230,6 +236,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
assertEquals(20, storage.memTable.kvmap.size());
// trigger a memtable flush
+ Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 67c69fe..7b44c94 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -227,7 +227,7 @@ public class DbLedgerStorageTest {
newEntry3.writeLong(4); // ledger id
newEntry3.writeLong(3); // entry id
newEntry3.writeBytes("new-entry-3".getBytes());
- long location = entryLogger.addEntry(4, newEntry3, false);
+ long location = entryLogger.addEntry(4L, newEntry3, false);
List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
singleDirStorage.updateEntriesLocations(locations);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index bdc8901..35b2fd3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -306,4 +306,4 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
assertEquals("Entry should contain correct data", "data", new String(entry.getEntry()));
}
}
-}
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.