You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/05/04 14:22:10 UTC
svn commit: r1479085 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: ivank
Date: Sat May 4 12:22:10 2013
New Revision: 1479085
URL: http://svn.apache.org/r1479085
Log:
BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat May 4 12:22:10 2013
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
BUGFIXES:
+ BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank)
+
BOOKKEEPER-596: Ledgers are gc'ed by mistake in MSLedgerManagerFactory. (sijie & ivank)
BOOKKEEPER-595: Crash of inprocess autorecovery daemon should not take down the bookie (ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat May 4 12:22:10 2013
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.meta.Ledger
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -239,39 +240,70 @@ public class Bookie extends Thread {
}
/**
- * SyncThread is a background thread which flushes ledger index pages periodically.
- * Also it takes responsibility of garbage collecting journal files.
- *
- * <p>
- * Before flushing, SyncThread first records a log marker {journalId, journalPos} in memory,
- * which indicates entries before this log marker would be persisted to ledger files.
- * Then sync thread begins flushing ledger index pages to ledger index files, flush entry
- * logger to ensure all entries persisted to entry loggers for future reads.
- * </p>
+ * SyncThread is a background thread which help checkpointing ledger storage
+ * when a checkpoint is requested. After a ledger storage is checkpointed,
+ * the journal files added before checkpoint will be garbage collected.
* <p>
- * After all data has been persisted to ledger index files and entry loggers, it is safe
- * to persist the log marker to disk. If bookie failed after persist log mark,
- * bookie is able to relay journal entries started from last log mark without losing
- * any entries.
+ * After all data has been persisted to ledger index files and entry
+ * loggers, it is safe to complete a checkpoint by persisting the log marker
+ * to disk. If bookie failed after persist log mark, bookie is able to relay
+ * journal entries started from last log mark without losing any entries.
* </p>
* <p>
- * Those journal files whose id are less than the log id in last log mark, could be
- * removed safely after persisting last log mark. We provide a setting to let user keeping
- * number of old journal files which may be used for manual recovery in critical disaster.
+ * Those journal files whose id are less than the log id in last log mark,
+ * could be removed safely after persisting last log mark. We provide a
+ * setting to let user keeping number of old journal files which may be used
+ * for manual recovery in critical disaster.
* </p>
*/
class SyncThread extends Thread {
volatile boolean running = true;
// flag to ensure sync thread will not be interrupted during flush
final AtomicBoolean flushing = new AtomicBoolean(false);
- // make flush interval as a parameter
final int flushInterval;
+
public SyncThread(ServerConfiguration conf) {
super("SyncThread");
flushInterval = conf.getFlushInterval();
LOG.debug("Flush Interval : {}", flushInterval);
}
+ /**
+ * flush data up to given logMark and roll log if success
+ * @param checkpoint
+ */
+ @VisibleForTesting
+ public void checkpoint(Checkpoint checkpoint) {
+ boolean flushFailed = false;
+ try {
+ if (running) {
+ checkpoint = ledgerStorage.checkpoint(checkpoint);
+ } else {
+ ledgerStorage.flush();
+ }
+ } catch (NoWritableLedgerDirException e) {
+ LOG.error("No writeable ledger directories");
+ flushFailed = true;
+ flushing.set(false);
+ transitionToReadOnlyMode();
+ } catch (IOException e) {
+ LOG.error("Exception flushing Ledger", e);
+ flushFailed = true;
+ }
+
+ // if flush failed, we should not roll last mark, otherwise we would
+ // have some ledgers are not flushed and their journal entries were lost
+ if (!flushFailed) {
+ try {
+ journal.checkpointComplete(checkpoint, running);
+ } catch (IOException e) {
+ flushing.set(false);
+ LOG.error("Marking checkpoint as complete failed", e);
+ transitionToReadOnlyMode();
+ }
+ }
+ }
+
private Object suspensionLock = new Object();
private boolean suspended = false;
@@ -299,61 +331,35 @@ public class Bookie extends Thread {
@Override
public void run() {
try {
- while (running) {
+ while(running) {
synchronized (this) {
try {
wait(flushInterval);
- if (!ledgerStorage.isFlushRequired()) {
- continue;
- }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
}
+
synchronized (suspensionLock) {
while (suspended) {
- suspensionLock.wait();
+ try {
+ suspensionLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
}
}
- // try to mark flushing flag to make sure it would not be interrupted
- // by shutdown during flushing. otherwise it will receive
- // ClosedByInterruptException which may cause index file & entry logger
- // closed and corrupted.
+
+ // try to mark flushing flag to check if interrupted
if (!flushing.compareAndSet(false, true)) {
// set flushing flag failed, means flushing is true now
// indicates another thread wants to interrupt sync thread to exit
break;
}
+ checkpoint(journal.newCheckpoint());
- // journal mark log
- journal.markLog();
-
- boolean flushFailed = false;
- try {
- ledgerStorage.flush();
- } catch (NoWritableLedgerDirException e) {
- flushFailed = true;
- flushing.set(false);
- transitionToReadOnlyMode();
- } catch (IOException e) {
- LOG.error("Exception flushing Ledger", e);
- flushFailed = true;
- }
-
- // if flush failed, we should not roll last mark, otherwise we would
- // have some ledgers are not flushed and their journal entries were lost
- if (!flushFailed) {
- try {
- journal.rollLog();
- journal.gcJournals();
- } catch (NoWritableLedgerDirException e) {
- flushing.set(false);
- transitionToReadOnlyMode();
- }
- }
-
- // clear flushing flag
flushing.set(false);
}
} catch (Throwable t) {
@@ -365,9 +371,10 @@ public class Bookie extends Thread {
// shutdown sync thread
void shutdown() throws InterruptedException {
+ // Wake up and finish sync thread
running = false;
+ // make a checkpoint when shutdown
if (flushing.compareAndSet(false, true)) {
- // if setting flushing flag succeed, means syncThread is not flushing now
// it is safe to interrupt itself now
this.interrupt();
}
@@ -532,12 +539,12 @@ public class Bookie extends Thread {
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
ledgerManager = ledgerManagerFactory.newLedgerManager();
syncThread = new SyncThread(conf);
+ // instantiate the journal
+ journal = new Journal(conf, ledgerDirsManager);
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
- ledgerDirsManager,
+ ledgerDirsManager, journal,
new BookieSafeEntryAdder());
handles = new HandleFactoryImpl(ledgerStorage);
- // instantiate the journal
- journal = new Journal(conf, ledgerDirsManager);
// ZK ephemeral node for this Bookie.
zkBookieRegPath = this.bookieRegistrationPath + getMyId();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Sat May 4 12:22:10 2013
@@ -916,10 +916,10 @@ public class BookieShell implements Tool
* Print last log mark
*/
protected void printLastLogMark() throws IOException {
- LastLogMark lastLogMark = getJournal().getLastLogMark();
- System.out.println("LastLogMark: Journal Id - " + lastLogMark.getTxnLogId() + "("
- + Long.toHexString(lastLogMark.getTxnLogId()) + ".txn), Pos - "
- + lastLogMark.getTxnLogPosition());
+ LogMark lastLogMark = getJournal().getLastLogMark().getCurMark();
+ System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "("
+ + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
+ + lastLogMark.getLogFileOffset());
}
/**
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java?rev=1479085&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java Sat May 4 12:22:10 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Interface to communicate checkpoint progress.
+ */
+public interface CheckpointSource {
+
+ /**
+ * A checkpoint presented a time point. All entries added before this checkpoint are already persisted.
+ */
+ public static interface Checkpoint extends Comparable<Checkpoint> {
+
+ public static final Checkpoint MAX = new Checkpoint() {
+
+ @Override
+ public int compareTo(Checkpoint o) {
+ if (o == MAX) {
+ return 0;
+ }
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o;
+ }
+
+ };
+
+ public static final Checkpoint MIN = new Checkpoint() {
+ @Override
+ public int compareTo(Checkpoint o) {
+ if (o == MIN) {
+ return 0;
+ }
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o;
+ }
+ };
+ }
+
+ /**
+ * Request a new a checkpoint.
+ *
+ * @return checkpoint.
+ */
+ public Checkpoint newCheckpoint();
+
+ /**
+ * Tell checkpoint source that the checkpoint is completed.
+ * If <code>compact</code> is true, the implementation could compact
+ * to reduce size of data containing old checkpoints.
+ *
+ * @param checkpoint
+ * The checkpoint that has been completed
+ * @param compact
+ * Flag to compact old checkpoints.
+ */
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException;
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Sat May 4 12:22:10 2013
@@ -21,6 +21,8 @@
package org.apache.bookkeeper.bookie;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -36,19 +38,23 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
-
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class manages the writing of the bookkeeper entries. All the new
@@ -65,12 +71,16 @@ public class EntryLogger {
private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
private long logId;
+ private volatile long leastUnflushedLogId;
/**
* The maximum size of a entry logger file.
*/
final long logSizeLimit;
+ private List<BufferedChannel> logChannelsToFlush;
private volatile BufferedChannel logChannel;
+ private final EntryLogListener listener;
+
/**
* The 1K block at the head of the entry logger file
* that contains the fingerprint and (future) meta-data
@@ -110,12 +120,29 @@ public class EntryLogger {
}
/**
+ * Entry Log Listener
+ */
+ static interface EntryLogListener {
+ /**
+ * Rotate a new entry log to write.
+ */
+ public void onRotateEntryLog();
+ }
+
+ /**
* Create an EntryLogger that stores it's log files in the given
* directories
*/
public EntryLogger(ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) throws IOException {
+ this(conf, ledgerDirsManager, null);
+ }
+
+ public EntryLogger(ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+ throws IOException {
this.ledgerDirsManager = ledgerDirsManager;
+ this.listener = listener;
// log size limit
this.logSizeLimit = conf.getEntryLogSizeLimit();
@@ -138,7 +165,7 @@ public class EntryLogger {
logId = lastLogId;
}
}
-
+ this.leastUnflushedLogId = logId + 1;
initialize();
}
@@ -147,6 +174,16 @@ public class EntryLogger {
*/
private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+ /**
+ * Get the least unflushed log id. Garbage collector thread should not process
+ * unflushed entry log file.
+ *
+ * @return least unflushed log id.
+ */
+ synchronized long getLeastUnflushedLogId() {
+ return leastUnflushedLogId;
+ }
+
synchronized long getCurrentLogId() {
return logId;
}
@@ -187,14 +224,28 @@ public class EntryLogger {
}
/**
+ * Rolling a new log file to write.
+ */
+ synchronized void rollLog() throws IOException {
+ createNewLog();
+ }
+
+ /**
* Creates a new log file
*/
void createNewLog() throws IOException {
- if (logChannel != null) {
- logChannel.flush(true);
+ if (null != logChannel) {
+ if (null == logChannelsToFlush) {
+ logChannelsToFlush = new LinkedList<BufferedChannel>();
+ }
+ // flush the internal buffer back to filesystem but not sync disk
+ // so the readers could access the data from filesystem.
+ logChannel.flush(false);
+ logChannelsToFlush.add(logChannel);
+ if (null != listener) {
+ listener.onRotateEntryLog();
+ }
}
-
- // It would better not to overwrite existing entry log files
String logFileName = null;
do {
logFileName = Long.toHexString(++logId) + ".log";
@@ -202,7 +253,7 @@ public class EntryLogger {
File newLogFile = new File(dir, logFileName);
if (newLogFile.exists()) {
LOG.warn("Found existed entry log " + newLogFile
- + " when trying to create it as a new log.");
+ + " when trying to create it as a new log.");
logFileName = null;
break;
}
@@ -324,21 +375,57 @@ public class EntryLogger {
}
}
- synchronized void flush() throws IOException {
+ /**
+ * Flushes all rotated log channels. After log channels are flushed,
+ * move leastUnflushedLogId ptr to current logId.
+ */
+ void checkpoint() throws IOException {
+ flushRotatedLogs();
+ }
+
+ void flushRotatedLogs() throws IOException {
+ List<BufferedChannel> tmpChannels = null;
+ long newUnflushedLogId;
+ synchronized (this) {
+ tmpChannels = logChannelsToFlush;
+ logChannelsToFlush = null;
+ newUnflushedLogId = logId;
+ }
+ if (null == tmpChannels) {
+ return;
+ }
+ for (BufferedChannel channel : tmpChannels) {
+ channel.flush(true);
+ }
+ // move the leastUnflushedLogId ptr
+ leastUnflushedLogId = newUnflushedLogId;
+ }
+
+ void flush() throws IOException {
+ flushRotatedLogs();
+ flushCurrentLog();
+ }
+
+ synchronized void flushCurrentLog() throws IOException {
if (logChannel != null) {
logChannel.flush(true);
}
}
- synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
- // Create new log if logSizeLimit reached or current disk is full
- boolean createNewLog = shouldCreateNewEntryLog.get();
- if (createNewLog
- || (logChannel.position() + entry.remaining() + 4 > logSizeLimit)) {
- createNewLog();
-
- // Reset the flag
- if (createNewLog) {
- shouldCreateNewEntryLog.set(false);
+
+ long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ return addEntry(ledger, entry, true);
+ }
+
+ synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throws IOException {
+ if (rollLog) {
+ // Create new log if logSizeLimit reached or current disk is full
+ boolean createNewLog = shouldCreateNewEntryLog.get();
+ if (createNewLog || reachEntryLogLimit(entry.remaining() + 4)) {
+ createNewLog();
+ // Reset the flag
+ if (createNewLog) {
+ shouldCreateNewEntryLog.set(false);
+ }
}
}
ByteBuffer buff = ByteBuffer.allocate(4);
@@ -347,11 +434,14 @@ public class EntryLogger {
logChannel.write(buff);
long pos = logChannel.position();
logChannel.write(entry);
- //logChannel.flush(false);
return (logId << 32L) | pos;
}
+ synchronized boolean reachEntryLogLimit(long size) {
+ return logChannel.position() + size > logSizeLimit;
+ }
+
byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
long entryLogId = location >> 32L;
long pos = location & 0xffffffffL;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Sat May 4 12:22:10 2013
@@ -226,6 +226,9 @@ class FileInfo {
private int readAbsolute(ByteBuffer bb, long start) throws IOException {
checkOpen(false);
+ if (fc == null) {
+ return 0;
+ }
int total = 0;
while(bb.remaining() > 0) {
int rc = fc.read(bb, start);
@@ -284,47 +287,49 @@ class FileInfo {
*/
public synchronized void moveToNewLocation(File newFile, long size) throws IOException {
checkOpen(false);
- if (size > fc.size()) {
- size = fc.size();
- }
- File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
- if (!rlocFile.exists()) {
- checkParents(rlocFile);
- if (!rlocFile.createNewFile()) {
- throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+ if (fc != null) {
+ if (size > fc.size()) {
+ size = fc.size();
+ }
+ File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+ if (!rlocFile.exists()) {
+ checkParents(rlocFile);
+ if (!rlocFile.createNewFile()) {
+ throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+ }
}
- }
- // copy contents from old.idx to new.idx.rloc
- FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
- try {
- long written = 0;
- while (written < size) {
- long count = fc.transferTo(written, size, newFc);
- if (count <= 0) {
+ // copy contents from old.idx to new.idx.rloc
+ FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
+ try {
+ long written = 0;
+ while (written < size) {
+ long count = fc.transferTo(written, size, newFc);
+ if (count <= 0) {
+ throw new IOException("Copying to new location " + rlocFile + " failed");
+ }
+ written += count;
+ }
+ if (written <= 0 && size > 0) {
throw new IOException("Copying to new location " + rlocFile + " failed");
}
- written += count;
+ } finally {
+ newFc.force(true);
+ newFc.close();
}
- if (written <= 0 && size > 0) {
- throw new IOException("Copying to new location " + rlocFile + " failed");
+ // delete old.idx
+ fc.close();
+ if (!delete()) {
+ LOG.error("Failed to delete the previous index file " + lf);
+ throw new IOException("Failed to delete the previous index file " + lf);
}
- } finally {
- newFc.force(true);
- newFc.close();
- }
- // delete old.idx
- fc.close();
- if (!delete()) {
- LOG.error("Failed to delete the previous index file " + lf);
- throw new IOException("Failed to delete the previous index file " + lf);
- }
-
- // rename new.idx.rloc to new.idx
- if (!rlocFile.renameTo(newFile)) {
- LOG.error("Failed to rename " + rlocFile + " to " + newFile);
- throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+
+ // rename new.idx.rloc to new.idx
+ if (!rlocFile.renameTo(newFile)) {
+ LOG.error("Failed to rename " + rlocFile + " to " + newFile);
+ throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+ }
+ fc = new RandomAccessFile(newFile, mode).getChannel();
}
- fc = new RandomAccessFile(newFile, mode).getChannel();
lf = newFile;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Sat May 4 12:22:10 2013
@@ -528,7 +528,7 @@ public class GarbageCollectorThread exte
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
- long curLogId = entryLogger.getCurrentLogId();
+ long curLogId = entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
// Comb the current entry log file if it has not already been extracted.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Sat May 4 12:22:10 2013
@@ -21,16 +21,16 @@
package org.apache.bookkeeper.bookie;
-import java.nio.ByteBuffer;
import java.io.IOException;
+import java.nio.ByteBuffer;
-import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.zookeeper.ZooKeeper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +39,34 @@ import org.slf4j.LoggerFactory;
* This ledger storage implementation stores all entries in a single
* file and maintains an index file for each ledger.
*/
-class InterleavedLedgerStorage implements LedgerStorage {
+class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
+ // Hold the last checkpoint
+ static class CheckpointHolder {
+ Checkpoint lastCheckpoint = Checkpoint.MAX;
+
+ synchronized void setNextCheckpoint(Checkpoint cp) {
+ if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) {
+ lastCheckpoint = cp;
+ }
+ }
+
+ synchronized void clearLastCheckpoint(Checkpoint done) {
+ if (0 == lastCheckpoint.compareTo(done)) {
+ lastCheckpoint = Checkpoint.MAX;
+ }
+ }
+
+ synchronized Checkpoint getLastCheckpoint() {
+ return lastCheckpoint;
+ }
+ }
+
EntryLogger entryLogger;
LedgerCache ledgerCache;
+ private final CheckpointSource checkpointSource;
+ private final CheckpointHolder checkpointHolder = new CheckpointHolder();
// A sorted map to stored all active ledger ids
protected final SnapshotMap<Long, Boolean> activeLedgers;
@@ -56,12 +79,12 @@ class InterleavedLedgerStorage implement
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
- InterleavedLedgerStorage(ServerConfiguration conf,
- LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
- GarbageCollectorThread.SafeEntryAdder safeEntryAdder)
- throws IOException {
+ InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource,
+ GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException {
activeLedgers = new SnapshotMap<Long, Boolean>();
- entryLogger = new EntryLogger(conf, ledgerDirsManager);
+ this.checkpointSource = checkpointSource;
+ entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
activeLedgers, safeEntryAdder, ledgerManager);
@@ -115,19 +138,8 @@ class InterleavedLedgerStorage implement
long ledgerId = entry.getLong();
long entryId = entry.getLong();
entry.rewind();
-
- /*
- * Log the entry
- */
- long pos = entryLogger.addEntry(ledgerId, entry);
-
-
- /*
- * Set offset of entry id to be the current ledger position
- */
- ledgerCache.putEntryOffset(ledgerId, entryId, pos);
- somethingWritten = true;
+ processEntry(ledgerId, entryId, entry);
return entryId;
}
@@ -149,29 +161,29 @@ class InterleavedLedgerStorage implement
return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
}
- @Override
- public boolean isFlushRequired() {
- return somethingWritten;
- };
-
- @Override
- public void flush() throws IOException {
+ private void flushOrCheckpoint(boolean isCheckpointFlush)
+ throws IOException {
- if (!somethingWritten) {
- return;
- }
- somethingWritten = false;
boolean flushFailed = false;
-
try {
ledgerCache.flushLedger(true);
+ } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
+ throw e;
} catch (IOException ioe) {
LOG.error("Exception flushing Ledger cache", ioe);
flushFailed = true;
}
try {
- entryLogger.flush();
+ // if it is just a checkpoint flush, we just flush rotated entry log files
+ // in entry logger.
+ if (isCheckpointFlush) {
+ entryLogger.checkpoint();
+ } else {
+ entryLogger.flush();
+ }
+ } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
+ throw e;
} catch (IOException ioe) {
LOG.error("Exception flushing Ledger", ioe);
flushFailed = true;
@@ -182,7 +194,66 @@ class InterleavedLedgerStorage implement
}
@Override
+ public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
+ Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
+ // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
+ if (lastCheckpoint.compareTo(checkpoint) > 0) {
+ return lastCheckpoint;
+ }
+ // we don't need to check somethingwritten since checkpoint
+ // is scheduled when rotate an entry logger file. and we could
+ // not set somethingWritten to false after checkpoint, since
+ // current entry logger file isn't flushed yet.
+ flushOrCheckpoint(true);
+ // after the ledger storage finished checkpointing, try to clear the done checkpoint
+ checkpointHolder.clearLastCheckpoint(lastCheckpoint);
+ return lastCheckpoint;
+ }
+
+ @Override
+ synchronized public void flush() throws IOException {
+ if (!somethingWritten) {
+ return;
+ }
+ somethingWritten = false;
+ flushOrCheckpoint(false);
+ }
+
+ @Override
public BKMBeanInfo getJMXBean() {
return ledgerCache.getJMXBean();
}
+
+ protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throws IOException {
+ processEntry(ledgerId, entryId, entry, true);
+ }
+
+ synchronized protected void processEntry(long ledgerId, long entryId, ByteBuffer entry, boolean rollLog)
+ throws IOException {
+ /*
+ * Touch dirty flag
+ */
+ somethingWritten = true;
+
+ /*
+ * Log the entry
+ */
+ long pos = entryLogger.addEntry(ledgerId, entry, rollLog);
+
+ /*
+ * Set offset of entry id to be the current ledger position
+ */
+ ledgerCache.putEntryOffset(ledgerId, entryId, pos);
+ }
+
+ @Override
+ public void onRotateEntryLog() {
+ // for interleaved ledger storage, we request a checkpoint when rotating a entry log file.
+ // the checkpoint represent the point that all the entries added before this point are already
+ // in ledger storage and ready to be synced to disk.
+ // TODO: we could consider remove checkpointSource and checkpointSouce#newCheckpoint
+ // later if we provide kind of LSN (Log/Journal Squeuence Number)
+ // mechanism when adding entry.
+ checkpointHolder.setNextCheckpoint(checkpointSource.newCheckpoint());
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat May 4 12:22:10 2013
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
/**
* Provide journal related management.
*/
-class Journal extends Thread {
+class Journal extends Thread implements CheckpointSource {
static Logger LOG = LoggerFactory.getLogger(Journal.class);
@@ -84,43 +85,69 @@ class Journal extends Thread {
}
/**
+ * A wrapper over log mark to provide a checkpoint for users of journal
+ * to do checkpointing.
+ */
+ private static class LogMarkCheckpoint implements Checkpoint {
+ final LastLogMark mark;
+
+ public LogMarkCheckpoint(LastLogMark checkpoint) {
+ this.mark = checkpoint;
+ }
+
+ @Override
+ public int compareTo(Checkpoint o) {
+ if (o == Checkpoint.MAX) {
+ return -1;
+ } else if (o == Checkpoint.MIN) {
+ return 1;
+ }
+ return mark.getCurMark().compare(((LogMarkCheckpoint)o).mark.getCurMark());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LogMarkCheckpoint)) {
+ return false;
+ }
+ return 0 == compareTo((LogMarkCheckpoint)o);
+ }
+
+ @Override
+ public int hashCode() {
+ return mark.hashCode();
+ }
+ }
+
+ /**
* Last Log Mark
*/
class LastLogMark {
- private long txnLogId;
- private long txnLogPosition;
- private LastLogMark lastMark;
+ private LogMark curMark;
LastLogMark(long logId, long logPosition) {
- this.txnLogId = logId;
- this.txnLogPosition = logPosition;
- }
- synchronized void setLastLogMark(long logId, long logPosition) {
- txnLogId = logId;
- txnLogPosition = logPosition;
- }
- synchronized void markLog() {
- lastMark = new LastLogMark(txnLogId, txnLogPosition);
+ this.curMark = new LogMark(logId, logPosition);
}
- synchronized LastLogMark getLastMark() {
- return lastMark;
+ synchronized void setCurLogMark(long logId, long logPosition) {
+ curMark.setLogMark(logId, logPosition);
}
- synchronized long getTxnLogId() {
- return txnLogId;
+
+ synchronized LastLogMark markLog() {
+ return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
}
- synchronized long getTxnLogPosition() {
- return txnLogPosition;
+
+ synchronized LogMark getCurMark() {
+ return curMark;
}
- synchronized void rollLog() throws NoWritableLedgerDirException {
+ synchronized void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
// we should record <logId, logPosition> marked in markLog
// which is safe since records before lastMark have been
// persisted to disk (both index & entry logger)
- bb.putLong(lastMark.getTxnLogId());
- bb.putLong(lastMark.getTxnLogPosition());
- LOG.debug("RollLog to persist last marked log : {}", lastMark);
+ lastMark.getCurMark().writeLogMark(bb);
+ LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
List<File> writableLedgerDirs = ledgerDirsManager
.getWritableLedgerDirs();
for (File dir : writableLedgerDirs) {
@@ -151,6 +178,7 @@ class Journal extends Thread {
synchronized void readLog() {
byte buff[] = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
+ LogMark mark = new LogMark();
for(File dir: ledgerDirsManager.getAllLedgerDirs()) {
File file = new File(dir, "lastMark");
try {
@@ -165,38 +193,31 @@ class Journal extends Thread {
fis.close();
}
bb.clear();
- long i = bb.getLong();
- long p = bb.getLong();
- if (i > txnLogId) {
- txnLogId = i;
- if(p > txnLogPosition) {
- txnLogPosition = p;
- }
+ mark.readLogMark(bb);
+ if (curMark.compare(mark) < 0) {
+ curMark.setLogMark(mark.getLogFileId(), mark.logFileOffset);
}
} catch (IOException e) {
LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
}
}
}
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
-
- sb.append("LastMark: logId - ").append(txnLogId)
- .append(" , position - ").append(txnLogPosition);
-
- return sb.toString();
- }
}
/**
* Filter to return list of journals for rolling
*/
- private class JournalRollingFilter implements JournalIdFilter {
+ private static class JournalRollingFilter implements JournalIdFilter {
+
+ final LastLogMark lastMark;
+
+ JournalRollingFilter(LastLogMark lastMark) {
+ this.lastMark = lastMark;
+ }
+
@Override
public boolean accept(long journalId) {
- if (journalId < lastLogMark.getLastMark().getTxnLogId()) {
+ if (journalId < lastMark.getCurMark().getLogFileId()) {
return true;
} else {
return false;
@@ -273,7 +294,7 @@ class Journal extends Thread {
// read last log mark
lastLogMark.readLog();
- LOG.debug("Last Log Mark : {}", lastLogMark);
+ LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
}
LastLogMark getLastLogMark() {
@@ -281,64 +302,45 @@ class Journal extends Thread {
}
/**
- * Records a <i>LastLogMark</i> in memory.
- *
- * <p>
- * The <i>LastLogMark</i> contains two parts: first one is <i>txnLogId</i>
- * (file id of a journal) and the second one is <i>txnLogPos</i> (offset in
- * a journal). The <i>LastLogMark</i> indicates that those entries before
- * it have been persisted to both index and entry log files.
- * </p>
- *
- * <p>
- * This method is called before flushing entry log files and ledger cache.
- * </p>
+ * Application tried to schedule a checkpoint. After all the txns added
+ * before checkpoint are persisted, a <i>checkpoint</i> will be returned
+ * to application. Application could use <i>checkpoint</i> to do its logic.
*/
- public void markLog() {
- lastLogMark.markLog();
+ @Override
+ public Checkpoint newCheckpoint() {
+ return new LogMarkCheckpoint(lastLogMark.markLog());
}
/**
- * Persists the <i>LastLogMark</i> marked by #markLog() to disk.
+ * Telling journal a checkpoint is finished.
*
- * <p>
- * This action means entries added before <i>LastLogMark</i> whose entry data
- * and index pages were already persisted to disk. It is the time to safely
- * remove journal files created earlier than <i>LastLogMark.txnLogId</i>.
- * </p>
- * <p>
- * If the bookie has crashed before persisting <i>LastLogMark</i> to disk,
- * it still has journal files contains entries for which index pages may not
- * have been persisted. Consequently, when the bookie restarts, it inspects
- * journal files to restore those entries; data isn't lost.
- * </p>
- * <p>
- * This method is called after flushing entry log files and ledger cache successfully, which is to ensure <i>LastLogMark</i> is pesisted.
- * </p>
- * @see #markLog()
- */
- public void rollLog() throws NoWritableLedgerDirException {
- lastLogMark.rollLog();
- }
-
- /**
- * Garbage collect older journals
+ * @throws IOException
*/
- public void gcJournals() {
- // list the journals that have been marked
- List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter());
- // keep MAX_BACKUP_JOURNALS journal files before marked journal
- if (logs.size() >= maxBackupJournals) {
- int maxIdx = logs.size() - maxBackupJournals;
- for (int i=0; i<maxIdx; i++) {
- long id = logs.get(i);
- // make sure the journal id is smaller than marked journal id
- if (id < lastLogMark.getLastMark().getTxnLogId()) {
- File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
- if (!journalFile.delete()) {
- LOG.warn("Could not delete old journal file {}", journalFile);
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+ if (!(checkpoint instanceof LogMarkCheckpoint)) {
+ return; // we didn't create this checkpoint, so dont do anything with it
+ }
+ LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint)checkpoint;
+ LastLogMark mark = lmcheckpoint.mark;
+
+ mark.rollLog(mark);
+ if (compact) {
+ // list the journals that have been marked
+ List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
+ // keep MAX_BACKUP_JOURNALS journal files before marked journal
+ if (logs.size() >= maxBackupJournals) {
+ int maxIdx = logs.size() - maxBackupJournals;
+ for (int i=0; i<maxIdx; i++) {
+ long id = logs.get(i);
+ // make sure the journal id is smaller than marked journal id
+ if (id < mark.getCurMark().getLogFileId()) {
+ File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
+ if (!journalFile.delete()) {
+ LOG.warn("Could not delete old journal file {}", journalFile);
+ }
+ LOG.info("garbage collected journal " + journalFile.getName());
}
- LOG.info("garbage collected journal " + journalFile.getName());
}
}
}
@@ -407,11 +409,11 @@ class Journal extends Thread {
* @throws IOException
*/
public void replay(JournalScanner scanner) throws IOException {
- final long markedLogId = lastLogMark.getTxnLogId();
+ final LogMark markedLog = lastLogMark.getCurMark();
List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
@Override
public boolean accept(long journalId) {
- if (journalId < markedLogId) {
+ if (journalId < markedLog.getLogFileId()) {
return false;
}
return true;
@@ -419,9 +421,9 @@ class Journal extends Thread {
});
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
- if (markedLogId > 0) {
- if (logs.size() == 0 || logs.get(0) != markedLogId) {
- throw new IOException("Recovery log " + markedLogId + " is missing");
+ if (markedLog.getLogFileId() > 0) {
+ if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
+ throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
}
}
LOG.debug("Try to relay journal logs : {}", logs);
@@ -430,8 +432,8 @@ class Journal extends Thread {
// system calls done.
for(Long id: logs) {
long logPosition = 0L;
- if(id == markedLogId) {
- logPosition = lastLogMark.getTxnLogPosition();
+ if(id == markedLog.getLogFileId()) {
+ logPosition = markedLog.getLogFileOffset();
}
scanJournal(id, logPosition, scanner);
}
@@ -501,7 +503,7 @@ class Journal extends Thread {
//logFile.force(false);
bc.flush(true);
lastFlushPosition = bc.position();
- lastLogMark.setLastLogMark(logId, lastFlushPosition);
+ lastLogMark.setCurLogMark(logId, lastFlushPosition);
for (QueueEntry e : toFlush) {
e.cb.writeComplete(BookieException.Code.OK,
e.ledgerId, e.entryId, null, e.ctx);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java Sat May 4 12:22:10 2013
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
/**
@@ -91,11 +92,6 @@ interface LedgerStorage {
ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
/**
- * Whether there is data in the storage which needs to be flushed
- */
- boolean isFlushRequired();
-
- /**
* Flushes all data in the storage. Once this is called,
* add data written to the LedgerStorage up until this point
* has been persisted to perminant storage
@@ -103,6 +99,19 @@ interface LedgerStorage {
void flush() throws IOException;
/**
+ * Ask the ledger storage to sync data until the given <i>checkpoint</i>.
+ * The ledger storage implementation do checkpoint and return the real checkpoint
+ * that it finished. The returned the checkpoint indicates that all entries added
+ * before that point already persist.
+ *
+ * @param checkpoint
+ * Check Point that {@link Checkpointer} proposed.
+ * @throws IOException
+ * @return the checkpoint that the ledger storage finished.
+ */
+ Checkpoint checkpoint(Checkpoint checkpoint) throws IOException;
+
+ /**
* Get the JMX management bean for this LedgerStorage
*/
BKMBeanInfo getJMXBean();
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java?rev=1479085&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Sat May 4 12:22:10 2013
@@ -0,0 +1,83 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Journal stream position
+ */
+class LogMark {
+ long logFileId;
+ long logFileOffset;
+
+ public LogMark() {
+ setLogMark(0, 0);
+ }
+
+ public LogMark(LogMark other) {
+ setLogMark(other.logFileId, other.logFileOffset);
+ }
+
+ public LogMark(long logFileId, long logFileOffset) {
+ setLogMark(logFileId, logFileOffset);
+ }
+
+ public long getLogFileId() {
+ return logFileId;
+ }
+
+ public long getLogFileOffset() {
+ return logFileOffset;
+ }
+
+ public void readLogMark(ByteBuffer bb) {
+ logFileId = bb.getLong();
+ logFileOffset = bb.getLong();
+ }
+
+ public void writeLogMark(ByteBuffer bb) {
+ bb.putLong(logFileId);
+ bb.putLong(logFileOffset);
+ }
+
+ public void setLogMark(long logFileId, long logFileOffset) {
+ this.logFileId = logFileId;
+ this.logFileOffset = logFileOffset;
+ }
+
+ public int compare(LogMark other) {
+ long ret = this.logFileId - other.logFileId;
+ if (ret == 0) {
+ ret = this.logFileOffset - other.logFileOffset;
+ }
+ return (ret < 0)? -1 : ((ret > 0)? 1 : 0);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("LogMark: logFileId - ").append(logFileId)
+ .append(" , logFileOffset - ").append(logFileOffset);
+
+ return sb.toString();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java Sat May 4 12:22:10 2013
@@ -22,6 +22,8 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+
/**
* Accessor class to avoid making Bookie internals public
*/
@@ -30,6 +32,8 @@ public class BookieAccessor {
* Force a bookie to flush its ledger storage
*/
public static void forceFlush(Bookie b) throws IOException {
+ Checkpoint cp = b.journal.newCheckpoint();
b.ledgerStorage.flush();
+ b.journal.checkpointComplete(cp, true);
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Sat May 4 12:22:10 2013
@@ -276,6 +276,8 @@ public class CompactionTest extends Book
bkc.deleteLedger(lhs[1].getId());
bkc.deleteLedger(lhs[2].getId());
LOG.info("Finished deleting the ledgers contains most entries.");
+ // restart bookies again to roll entry log files.
+ restartBookies();
Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+ baseConf.getGcWaitTime());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Sat May 4 12:22:10 2013
@@ -96,6 +96,8 @@ public class LedgerDeleteTest extends Mu
public void testLedgerDelete() throws Exception {
// Write enough ledger entries so that we roll over the initial entryLog (0.log)
LedgerHandle[] lhs = writeLedgerEntries(3, 1024, 1024);
+ // restart bookies to force rolling entry log files
+ restartBookies();
// Delete all of these ledgers from the BookKeeper client
for (LedgerHandle lh : lhs) {
@@ -136,7 +138,7 @@ public class LedgerDeleteTest extends Mu
bkc.deleteLedger(lh.getId());
}
LOG.info("Finished deleting all ledgers so waiting for the GC thread to clean up the entryLogs");
- Thread.sleep(2000);
+ Thread.sleep(2 * baseConf.getGcWaitTime());
/*
* Verify that the first two entry logs ([0,1].log) have been deleted