You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/05/04 14:22:10 UTC

svn commit: r1479085 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: ivank
Date: Sat May  4 12:22:10 2013
New Revision: 1479085

URL: http://svn.apache.org/r1479085
Log:
BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat May  4 12:22:10 2013
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
 
     BUGFIXES:
 
+      BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank)
+
       BOOKKEEPER-596: Ledgers are gc'ed by mistake in MSLedgerManagerFactory. (sijie & ivank)
 
       BOOKKEEPER-595: Crash of inprocess autorecovery daemon should not take down the bookie (ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat May  4 12:22:10 2013
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.meta.Ledger
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -239,39 +240,70 @@ public class Bookie extends Thread {
     }
 
     /**
-     * SyncThread is a background thread which flushes ledger index pages periodically.
-     * Also it takes responsibility of garbage collecting journal files.
-     *
-     * <p>
-     * Before flushing, SyncThread first records a log marker {journalId, journalPos} in memory,
-     * which indicates entries before this log marker would be persisted to ledger files.
-     * Then sync thread begins flushing ledger index pages to ledger index files, flush entry
-     * logger to ensure all entries persisted to entry loggers for future reads.
-     * </p>
+     * SyncThread is a background thread which help checkpointing ledger storage
+     * when a checkpoint is requested. After a ledger storage is checkpointed,
+     * the journal files added before checkpoint will be garbage collected.
      * <p>
-     * After all data has been persisted to ledger index files and entry loggers, it is safe
-     * to persist the log marker to disk. If bookie failed after persist log mark,
-     * bookie is able to relay journal entries started from last log mark without losing
-     * any entries.
+     * After all data has been persisted to ledger index files and entry
+     * loggers, it is safe to complete a checkpoint by persisting the log marker
+     * to disk. If bookie failed after persist log mark, bookie is able to relay
+     * journal entries started from last log mark without losing any entries.
      * </p>
      * <p>
-     * Those journal files whose id are less than the log id in last log mark, could be
-     * removed safely after persisting last log mark. We provide a setting to let user keeping
-     * number of old journal files which may be used for manual recovery in critical disaster.
+     * Those journal files whose id are less than the log id in last log mark,
+     * could be removed safely after persisting last log mark. We provide a
+     * setting to let user keeping number of old journal files which may be used
+     * for manual recovery in critical disaster.
      * </p>
      */
     class SyncThread extends Thread {
         volatile boolean running = true;
         // flag to ensure sync thread will not be interrupted during flush
         final AtomicBoolean flushing = new AtomicBoolean(false);
-        // make flush interval as a parameter
         final int flushInterval;
+
         public SyncThread(ServerConfiguration conf) {
             super("SyncThread");
             flushInterval = conf.getFlushInterval();
             LOG.debug("Flush Interval : {}", flushInterval);
         }
 
+        /**
+         * flush data up to given logMark and roll log if success
+         * @param checkpoint
+         */
+        @VisibleForTesting
+        public void checkpoint(Checkpoint checkpoint) {
+            boolean flushFailed = false;
+            try {
+                if (running) {
+                    checkpoint = ledgerStorage.checkpoint(checkpoint);
+                } else {
+                    ledgerStorage.flush();
+                }
+            } catch (NoWritableLedgerDirException e) {
+                LOG.error("No writeable ledger directories");
+                flushFailed = true;
+                flushing.set(false);
+                transitionToReadOnlyMode();
+            } catch (IOException e) {
+                LOG.error("Exception flushing Ledger", e);
+                flushFailed = true;
+            }
+
+            // if flush failed, we should not roll last mark, otherwise we would
+            // have some ledgers are not flushed and their journal entries were lost
+            if (!flushFailed) {
+                try {
+                    journal.checkpointComplete(checkpoint, running);
+                } catch (IOException e) {
+                    flushing.set(false);
+                    LOG.error("Marking checkpoint as complete failed", e);
+                    transitionToReadOnlyMode();
+                }
+            }
+        }
+
         private Object suspensionLock = new Object();
         private boolean suspended = false;
 
@@ -299,61 +331,35 @@ public class Bookie extends Thread {
         @Override
         public void run() {
             try {
-                while (running) {
+                while(running) {
                     synchronized (this) {
                         try {
                             wait(flushInterval);
-                            if (!ledgerStorage.isFlushRequired()) {
-                                continue;
-                            }
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                             continue;
                         }
                     }
+
                     synchronized (suspensionLock) {
                         while (suspended) {
-                            suspensionLock.wait();
+                            try {
+                                suspensionLock.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                continue;
+                            }
                         }
                     }
-                    // try to mark flushing flag to make sure it would not be interrupted
-                    // by shutdown during flushing. otherwise it will receive
-                    // ClosedByInterruptException which may cause index file & entry logger
-                    // closed and corrupted.
+
+                    // try to mark flushing flag to check if interrupted
                     if (!flushing.compareAndSet(false, true)) {
                         // set flushing flag failed, means flushing is true now
                         // indicates another thread wants to interrupt sync thread to exit
                         break;
                     }
+                    checkpoint(journal.newCheckpoint());
 
-                    // journal mark log
-                    journal.markLog();
-
-                    boolean flushFailed = false;
-                    try {
-                        ledgerStorage.flush();
-                    } catch (NoWritableLedgerDirException e) {
-                        flushFailed = true;
-                        flushing.set(false);
-                        transitionToReadOnlyMode();
-                    } catch (IOException e) {
-                        LOG.error("Exception flushing Ledger", e);
-                        flushFailed = true;
-                    }
-
-                    // if flush failed, we should not roll last mark, otherwise we would
-                    // have some ledgers are not flushed and their journal entries were lost
-                    if (!flushFailed) {
-                        try {
-                            journal.rollLog();
-                            journal.gcJournals();
-                        } catch (NoWritableLedgerDirException e) {
-                            flushing.set(false);
-                            transitionToReadOnlyMode();
-                        }
-                    }
-
-                    // clear flushing flag
                     flushing.set(false);
                 }
             } catch (Throwable t) {
@@ -365,9 +371,10 @@ public class Bookie extends Thread {
 
         // shutdown sync thread
         void shutdown() throws InterruptedException {
+            // Wake up and finish sync thread
             running = false;
+            // make a checkpoint when shutdown
             if (flushing.compareAndSet(false, true)) {
-                // if setting flushing flag succeed, means syncThread is not flushing now
                 // it is safe to interrupt itself now 
                 this.interrupt();
             }
@@ -532,12 +539,12 @@ public class Bookie extends Thread {
         LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
         ledgerManager = ledgerManagerFactory.newLedgerManager();
         syncThread = new SyncThread(conf);
+        // instantiate the journal
+        journal = new Journal(conf, ledgerDirsManager);
         ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
-                                                     ledgerDirsManager,
+                                                     ledgerDirsManager, journal,
                                                      new BookieSafeEntryAdder());
         handles = new HandleFactoryImpl(ledgerStorage);
-        // instantiate the journal
-        journal = new Journal(conf, ledgerDirsManager);
 
         // ZK ephemeral node for this Bookie.
         zkBookieRegPath = this.bookieRegistrationPath + getMyId();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Sat May  4 12:22:10 2013
@@ -916,10 +916,10 @@ public class BookieShell implements Tool
      * Print last log mark
      */
     protected void printLastLogMark() throws IOException {
-        LastLogMark lastLogMark = getJournal().getLastLogMark();
-        System.out.println("LastLogMark: Journal Id - " + lastLogMark.getTxnLogId() + "("
-                + Long.toHexString(lastLogMark.getTxnLogId()) + ".txn), Pos - "
-                + lastLogMark.getTxnLogPosition());
+        LogMark lastLogMark = getJournal().getLastLogMark().getCurMark();
+        System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "("
+                + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
+                + lastLogMark.getLogFileOffset());
     }
 
     /**

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java?rev=1479085&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java Sat May  4 12:22:10 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Interface to communicate checkpoint progress.
+ */
+public interface CheckpointSource {
+
+    /**
+     * A checkpoint presented a time point. All entries added before this checkpoint are already persisted.
+     */
+    public static interface Checkpoint extends Comparable<Checkpoint> {
+
+        public static final Checkpoint MAX = new Checkpoint() {
+
+            @Override
+            public int compareTo(Checkpoint o) {
+                if (o == MAX) {
+                    return 0;
+                }
+                return 1;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                return this == o;
+            }
+
+        };
+
+        public static final Checkpoint MIN = new Checkpoint() {
+            @Override
+            public int compareTo(Checkpoint o) {
+                if (o == MIN) {
+                    return 0;
+                }
+                return 1;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                return this == o;
+            }
+        };
+    }
+
+    /**
+     * Request a new a checkpoint.
+     *
+     * @return checkpoint.
+     */
+    public Checkpoint newCheckpoint();
+
+    /**
+     * Tell checkpoint source that the checkpoint is completed.
+     * If <code>compact</code> is true, the implementation could compact
+     * to reduce size of data containing old checkpoints.
+     *
+     * @param checkpoint
+     *          The checkpoint that has been completed
+     * @param compact
+     *          Flag to compact old checkpoints.
+     */
+    public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException;
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Sat May  4 12:22:10 2013
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -36,19 +38,23 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Charsets.UTF_8;
-
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class manages the writing of the bookkeeper entries. All the new
@@ -65,12 +71,16 @@ public class EntryLogger {
     private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
     private long logId;
+    private volatile long leastUnflushedLogId;
 
     /**
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
+    private List<BufferedChannel> logChannelsToFlush;
     private volatile BufferedChannel logChannel;
+    private final EntryLogListener listener;
+
     /**
      * The 1K block at the head of the entry logger file
      * that contains the fingerprint and (future) meta-data
@@ -110,12 +120,29 @@ public class EntryLogger {
     }
 
     /**
+     * Entry Log Listener
+     */
+    static interface EntryLogListener {
+        /**
+         * Rotate a new entry log to write.
+         */
+        public void onRotateEntryLog();
+    }
+
+    /**
      * Create an EntryLogger that stores it's log files in the given
      * directories
      */
     public EntryLogger(ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) throws IOException {
+        this(conf, ledgerDirsManager, null);
+    }
+
+    public EntryLogger(ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+                    throws IOException {
         this.ledgerDirsManager = ledgerDirsManager;
+        this.listener = listener;
         // log size limit
         this.logSizeLimit = conf.getEntryLogSizeLimit();
 
@@ -138,7 +165,7 @@ public class EntryLogger {
                 logId = lastLogId;
             }
         }
-
+        this.leastUnflushedLogId = logId + 1;
         initialize();
     }
 
@@ -147,6 +174,16 @@ public class EntryLogger {
      */
     private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
 
+    /**
+     * Get the least unflushed log id. Garbage collector thread should not process
+     * unflushed entry log file.
+     *
+     * @return least unflushed log id.
+     */
+    synchronized long getLeastUnflushedLogId() {
+        return leastUnflushedLogId;
+    }
+
     synchronized long getCurrentLogId() {
         return logId;
     }
@@ -187,14 +224,28 @@ public class EntryLogger {
     }
 
     /**
+     * Rolling a new log file to write.
+     */
+    synchronized void rollLog() throws IOException {
+        createNewLog();
+    }
+
+    /**
      * Creates a new log file
      */
     void createNewLog() throws IOException {
-        if (logChannel != null) {
-            logChannel.flush(true);
+        if (null != logChannel) {
+            if (null == logChannelsToFlush) {
+                logChannelsToFlush = new LinkedList<BufferedChannel>();
+            }
+            // flush the internal buffer back to filesystem but not sync disk
+            // so the readers could access the data from filesystem.
+            logChannel.flush(false);
+            logChannelsToFlush.add(logChannel);
+            if (null != listener) {
+                listener.onRotateEntryLog();
+            }
         }
-
-        // It would better not to overwrite existing entry log files
         String logFileName = null;
         do {
             logFileName = Long.toHexString(++logId) + ".log";
@@ -202,7 +253,7 @@ public class EntryLogger {
                 File newLogFile = new File(dir, logFileName);
                 if (newLogFile.exists()) {
                     LOG.warn("Found existed entry log " + newLogFile
-                           + " when trying to create it as a new log.");
+                             + " when trying to create it as a new log.");
                     logFileName = null;
                     break;
                 }
@@ -324,21 +375,57 @@ public class EntryLogger {
         }
     }
 
-    synchronized void flush() throws IOException {
+    /**
+     * Flushes all rotated log channels. After log channels are flushed,
+     * move leastUnflushedLogId ptr to current logId.
+     */
+    void checkpoint() throws IOException {
+        flushRotatedLogs();
+    }
+
+    void flushRotatedLogs() throws IOException {
+        List<BufferedChannel> tmpChannels = null;
+        long newUnflushedLogId;
+        synchronized (this) {
+            tmpChannels = logChannelsToFlush;
+            logChannelsToFlush = null;
+            newUnflushedLogId = logId;
+        }
+        if (null == tmpChannels) {
+            return;
+        }
+        for (BufferedChannel channel : tmpChannels) {
+            channel.flush(true);
+        }
+        // move the leastUnflushedLogId ptr
+        leastUnflushedLogId = newUnflushedLogId;
+    }
+
+    void flush() throws IOException {
+        flushRotatedLogs();
+        flushCurrentLog();
+    }
+
+    synchronized void flushCurrentLog() throws IOException {
         if (logChannel != null) {
             logChannel.flush(true);
         }
     }
-    synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
-        // Create new log if logSizeLimit reached or current disk is full
-        boolean createNewLog = shouldCreateNewEntryLog.get();
-        if (createNewLog
-                || (logChannel.position() + entry.remaining() + 4 > logSizeLimit)) {
-            createNewLog();
-
-            // Reset the flag
-            if (createNewLog) {
-                shouldCreateNewEntryLog.set(false);
+
+    long addEntry(long ledger, ByteBuffer entry) throws IOException {
+        return addEntry(ledger, entry, true);
+    }
+
+    synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throws IOException {
+        if (rollLog) {
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean createNewLog = shouldCreateNewEntryLog.get();
+            if (createNewLog || reachEntryLogLimit(entry.remaining() + 4)) {
+                createNewLog();
+                // Reset the flag
+                if (createNewLog) {
+                    shouldCreateNewEntryLog.set(false);
+                }
             }
         }
         ByteBuffer buff = ByteBuffer.allocate(4);
@@ -347,11 +434,14 @@ public class EntryLogger {
         logChannel.write(buff);
         long pos = logChannel.position();
         logChannel.write(entry);
-        //logChannel.flush(false);
 
         return (logId << 32L) | pos;
     }
 
+    synchronized boolean reachEntryLogLimit(long size) {
+        return logChannel.position() + size > logSizeLimit;
+    }
+
     byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
         long entryLogId = location >> 32L;
         long pos = location & 0xffffffffL;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Sat May  4 12:22:10 2013
@@ -226,6 +226,9 @@ class FileInfo {
 
     private int readAbsolute(ByteBuffer bb, long start) throws IOException {
         checkOpen(false);
+        if (fc == null) {
+            return 0;
+        }
         int total = 0;
         while(bb.remaining() > 0) {
             int rc = fc.read(bb, start);
@@ -284,47 +287,49 @@ class FileInfo {
      */
     public synchronized void moveToNewLocation(File newFile, long size) throws IOException {
         checkOpen(false);
-        if (size > fc.size()) {
-            size = fc.size();
-        }
-        File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
-        if (!rlocFile.exists()) {
-            checkParents(rlocFile);
-            if (!rlocFile.createNewFile()) {
-                throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+        if (fc != null) {
+            if (size > fc.size()) {
+                size = fc.size();
+            }
+            File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+            if (!rlocFile.exists()) {
+                checkParents(rlocFile);
+                if (!rlocFile.createNewFile()) {
+                    throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+                }
             }
-        }
-        // copy contents from old.idx to new.idx.rloc
-        FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
-        try {
-            long written = 0;
-            while (written < size) {
-                long count = fc.transferTo(written, size, newFc);
-                if (count <= 0) {
+            // copy contents from old.idx to new.idx.rloc
+            FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
+            try {
+                long written = 0;
+                while (written < size) {
+                    long count = fc.transferTo(written, size, newFc);
+                    if (count <= 0) {
+                        throw new IOException("Copying to new location " + rlocFile + " failed");
+                    }
+                    written += count;
+                }
+                if (written <= 0 && size > 0) {
                     throw new IOException("Copying to new location " + rlocFile + " failed");
                 }
-                written += count;
+            } finally {
+                newFc.force(true);
+                newFc.close();
             }
-            if (written <= 0 && size > 0) {
-                throw new IOException("Copying to new location " + rlocFile + " failed");
+            // delete old.idx
+            fc.close();
+            if (!delete()) {
+                LOG.error("Failed to delete the previous index file " + lf);
+                throw new IOException("Failed to delete the previous index file " + lf);
             }
-        } finally {
-            newFc.force(true);
-            newFc.close();
-        }
-        // delete old.idx
-        fc.close();
-        if (!delete()) {
-            LOG.error("Failed to delete the previous index file " + lf);
-            throw new IOException("Failed to delete the previous index file " + lf);
-        }
-
-        // rename new.idx.rloc to new.idx
-        if (!rlocFile.renameTo(newFile)) {
-            LOG.error("Failed to rename " + rlocFile + " to " + newFile);
-            throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+
+            // rename new.idx.rloc to new.idx
+            if (!rlocFile.renameTo(newFile)) {
+                LOG.error("Failed to rename " + rlocFile + " to " + newFile);
+                throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+            }
+            fc = new RandomAccessFile(newFile, mode).getChannel();
         }
-        fc = new RandomAccessFile(newFile, mode).getChannel();
         lf = newFile;
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Sat May  4 12:22:10 2013
@@ -528,7 +528,7 @@ public class GarbageCollectorThread exte
         // Extract it for every entry log except for the current one.
         // Entry Log ID's are just a long value that starts at 0 and increments
         // by 1 when the log fills up and we roll to a new one.
-        long curLogId = entryLogger.getCurrentLogId();
+        long curLogId = entryLogger.getLeastUnflushedLogId();
         boolean hasExceptionWhenScan = false;
         for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
             // Comb the current entry log file if it has not already been extracted.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Sat May  4 12:22:10 2013
@@ -21,16 +21,16 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.nio.ByteBuffer;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
-import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.zookeeper.ZooKeeper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +39,34 @@ import org.slf4j.LoggerFactory;
  * This ledger storage implementation stores all entries in a single
  * file and maintains an index file for each ledger.
  */
-class InterleavedLedgerStorage implements LedgerStorage {
+class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
     final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
+    // Hold the last checkpoint
+    static class CheckpointHolder {
+        Checkpoint lastCheckpoint = Checkpoint.MAX;
+
+        synchronized void setNextCheckpoint(Checkpoint cp) {
+            if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) {
+                lastCheckpoint = cp;
+            }
+        }
+
+        synchronized void clearLastCheckpoint(Checkpoint done) {
+            if (0 == lastCheckpoint.compareTo(done)) {
+                lastCheckpoint = Checkpoint.MAX;
+            }
+        }
+
+        synchronized Checkpoint getLastCheckpoint() {
+            return lastCheckpoint;
+        }
+    }
+
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
+    private final CheckpointSource checkpointSource;
+    private final CheckpointHolder checkpointHolder = new CheckpointHolder();
 
     // A sorted map to stored all active ledger ids
     protected final SnapshotMap<Long, Boolean> activeLedgers;
@@ -56,12 +79,12 @@ class InterleavedLedgerStorage implement
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
-    InterleavedLedgerStorage(ServerConfiguration conf,
-                             LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
-                             GarbageCollectorThread.SafeEntryAdder safeEntryAdder)
-			throws IOException {
+    InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource,
+            GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException {
         activeLedgers = new SnapshotMap<Long, Boolean>();
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        this.checkpointSource = checkpointSource;
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
                 activeLedgers, safeEntryAdder, ledgerManager);
@@ -115,19 +138,8 @@ class InterleavedLedgerStorage implement
         long ledgerId = entry.getLong();
         long entryId = entry.getLong();
         entry.rewind();
-        
-        /*
-         * Log the entry
-         */
-        long pos = entryLogger.addEntry(ledgerId, entry);
-        
-        
-        /*
-         * Set offset of entry id to be the current ledger position
-         */
-        ledgerCache.putEntryOffset(ledgerId, entryId, pos);
 
-        somethingWritten = true;
+        processEntry(ledgerId, entryId, entry);
 
         return entryId;
     }
@@ -149,29 +161,29 @@ class InterleavedLedgerStorage implement
         return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
     }
 
-    @Override
-    public boolean isFlushRequired() {
-        return somethingWritten;
-    };
-
-    @Override
-    public void flush() throws IOException {
+    private void flushOrCheckpoint(boolean isCheckpointFlush)
+            throws IOException {
 
-        if (!somethingWritten) {
-            return;
-        }
-        somethingWritten = false;
         boolean flushFailed = false;
-
         try {
             ledgerCache.flushLedger(true);
+        } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
+            throw e;
         } catch (IOException ioe) {
             LOG.error("Exception flushing Ledger cache", ioe);
             flushFailed = true;
         }
 
         try {
-            entryLogger.flush();
+            // if it is just a checkpoint flush, we just flush rotated entry log files
+            // in entry logger.
+            if (isCheckpointFlush) {
+                entryLogger.checkpoint();
+            } else {
+                entryLogger.flush();
+            }
+        } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
+            throw e;
         } catch (IOException ioe) {
             LOG.error("Exception flushing Ledger", ioe);
             flushFailed = true;
@@ -182,7 +194,66 @@ class InterleavedLedgerStorage implement
     }
 
     @Override
+    public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
+        Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
+        // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return lastCheckpoint;
+        }
+        // we don't need to check somethingwritten since checkpoint
+        // is scheduled when rotate an entry logger file. and we could
+        // not set somethingWritten to false after checkpoint, since
+        // current entry logger file isn't flushed yet.
+        flushOrCheckpoint(true);
+        // after the ledger storage finished checkpointing, try to clear the done checkpoint
+        checkpointHolder.clearLastCheckpoint(lastCheckpoint);
+        return lastCheckpoint;
+    }
+
+    @Override
+    synchronized public void flush() throws IOException {
+        if (!somethingWritten) {
+            return;
+        }
+        somethingWritten = false;
+        flushOrCheckpoint(false);
+    }
+
+    @Override
     public BKMBeanInfo getJMXBean() {
         return ledgerCache.getJMXBean();
     }
+
+    protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throws IOException {
+        processEntry(ledgerId, entryId, entry, true);
+    }
+
+    synchronized protected void processEntry(long ledgerId, long entryId, ByteBuffer entry, boolean rollLog)
+            throws IOException {
+        /*
+         * Touch dirty flag
+         */
+        somethingWritten = true;
+
+        /*
+         * Log the entry
+         */
+        long pos = entryLogger.addEntry(ledgerId, entry, rollLog);
+
+        /*
+         * Set offset of entry id to be the current ledger position
+         */
+        ledgerCache.putEntryOffset(ledgerId, entryId, pos);
+    }
+
+    @Override
+    public void onRotateEntryLog() {
+        // for interleaved ledger storage, we request a checkpoint when rotating a entry log file.
+        // the checkpoint represent the point that all the entries added before this point are already
+        // in ledger storage and ready to be synced to disk.
+        // TODO: we could consider remove checkpointSource and checkpointSouce#newCheckpoint
+        // later if we provide kind of LSN (Log/Journal Squeuence Number)
+        // mechanism when adding entry.
+        checkpointHolder.setNextCheckpoint(checkpointSource.newCheckpoint());
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat May  4 12:22:10 2013
@@ -32,6 +32,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Provide journal related management.
  */
-class Journal extends Thread {
+class Journal extends Thread implements CheckpointSource {
 
     static Logger LOG = LoggerFactory.getLogger(Journal.class);
 
@@ -84,43 +85,69 @@ class Journal extends Thread {
     }
 
     /**
+     * A wrapper over log mark to provide a checkpoint for users of journal
+     * to do checkpointing.
+     */
+    private static class LogMarkCheckpoint implements Checkpoint {
+        final LastLogMark mark;
+
+        public LogMarkCheckpoint(LastLogMark checkpoint) {
+            this.mark = checkpoint;
+        }
+
+        @Override
+        public int compareTo(Checkpoint o) {
+            if (o == Checkpoint.MAX) {
+                return -1;
+            } else if (o == Checkpoint.MIN) {
+                return 1;
+            }
+            return mark.getCurMark().compare(((LogMarkCheckpoint)o).mark.getCurMark());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LogMarkCheckpoint)) {
+                return false;
+            }
+            return 0 == compareTo((LogMarkCheckpoint)o);
+        }
+
+        @Override
+        public int hashCode() {
+            return mark.hashCode();
+        }
+    }
+
+    /**
      * Last Log Mark
      */
     class LastLogMark {
-        private long txnLogId;
-        private long txnLogPosition;
-        private LastLogMark lastMark;
+        private LogMark curMark;
         LastLogMark(long logId, long logPosition) {
-            this.txnLogId = logId;
-            this.txnLogPosition = logPosition;
-        }
-        synchronized void setLastLogMark(long logId, long logPosition) {
-            txnLogId = logId;
-            txnLogPosition = logPosition;
-        }
-        synchronized void markLog() {
-            lastMark = new LastLogMark(txnLogId, txnLogPosition);
+            this.curMark = new LogMark(logId, logPosition);
         }
 
-        synchronized LastLogMark getLastMark() {
-            return lastMark;
+        synchronized void setCurLogMark(long logId, long logPosition) {
+            curMark.setLogMark(logId, logPosition);
         }
-        synchronized long getTxnLogId() {
-            return txnLogId;
+
+        synchronized LastLogMark markLog() {
+            return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset());
         }
-        synchronized long getTxnLogPosition() {
-            return txnLogPosition;
+
+        synchronized LogMark getCurMark() {
+            return curMark;
         }
 
-        synchronized void rollLog() throws NoWritableLedgerDirException {
+        synchronized void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             // we should record <logId, logPosition> marked in markLog
             // which is safe since records before lastMark have been
             // persisted to disk (both index & entry logger)
-            bb.putLong(lastMark.getTxnLogId());
-            bb.putLong(lastMark.getTxnLogPosition());
-            LOG.debug("RollLog to persist last marked log : {}", lastMark);
+            lastMark.getCurMark().writeLogMark(bb);
+            LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
             List<File> writableLedgerDirs = ledgerDirsManager
                     .getWritableLedgerDirs();
             for (File dir : writableLedgerDirs) {
@@ -151,6 +178,7 @@ class Journal extends Thread {
         synchronized void readLog() {
             byte buff[] = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
+            LogMark mark = new LogMark();
             for(File dir: ledgerDirsManager.getAllLedgerDirs()) {
                 File file = new File(dir, "lastMark");
                 try {
@@ -165,38 +193,31 @@ class Journal extends Thread {
                         fis.close();
                     }
                     bb.clear();
-                    long i = bb.getLong();
-                    long p = bb.getLong();
-                    if (i > txnLogId) {
-                        txnLogId = i;
-                        if(p > txnLogPosition) {
-                          txnLogPosition = p;
-                        }
+                    mark.readLogMark(bb);
+                    if (curMark.compare(mark) < 0) {
+                        curMark.setLogMark(mark.getLogFileId(), mark.logFileOffset);
                     }
                 } catch (IOException e) {
                     LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
                 }
             }
         }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-
-            sb.append("LastMark: logId - ").append(txnLogId)
-              .append(" , position - ").append(txnLogPosition);
-
-            return sb.toString();
-        }
     }
 
     /**
      * Filter to return list of journals for rolling
      */
-    private class JournalRollingFilter implements JournalIdFilter {
+    private static class JournalRollingFilter implements JournalIdFilter {
+
+        final LastLogMark lastMark;
+
+        JournalRollingFilter(LastLogMark lastMark) {
+            this.lastMark = lastMark;
+        }
+
         @Override
         public boolean accept(long journalId) {
-            if (journalId < lastLogMark.getLastMark().getTxnLogId()) {
+            if (journalId < lastMark.getCurMark().getLogFileId()) {
                 return true;
             } else {
                 return false;
@@ -273,7 +294,7 @@ class Journal extends Thread {
 
         // read last log mark
         lastLogMark.readLog();
-        LOG.debug("Last Log Mark : {}", lastLogMark);
+        LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
     }
 
     LastLogMark getLastLogMark() {
@@ -281,64 +302,45 @@ class Journal extends Thread {
     }
 
     /**
-     * Records a <i>LastLogMark</i> in memory.
-     *
-     * <p>
-     * The <i>LastLogMark</i> contains two parts: first one is <i>txnLogId</i>
-     * (file id of a journal) and the second one is <i>txnLogPos</i> (offset in
-     *  a journal). The <i>LastLogMark</i> indicates that those entries before
-     * it have been persisted to both index and entry log files.
-     * </p>
-     *
-     * <p>
-     * This method is called before flushing entry log files and ledger cache.
-     * </p>
+     * Application tried to schedule a checkpoint. After all the txns added
+     * before checkpoint are persisted, a <i>checkpoint</i> will be returned
+     * to application. Application could use <i>checkpoint</i> to do its logic.
      */
-    public void markLog() {
-        lastLogMark.markLog();
+    @Override
+    public Checkpoint newCheckpoint() {
+        return new LogMarkCheckpoint(lastLogMark.markLog());
     }
 
     /**
-     * Persists the <i>LastLogMark</i> marked by #markLog() to disk.
+     * Telling journal a checkpoint is finished.
      *
-     * <p>
-     * This action means entries added before <i>LastLogMark</i> whose entry data
-     * and index pages were already persisted to disk. It is the time to safely
-     * remove journal files created earlier than <i>LastLogMark.txnLogId</i>.
-     * </p>
-     * <p>
-     * If the bookie has crashed before persisting <i>LastLogMark</i> to disk,
-     * it still has journal files contains entries for which index pages may not
-     * have been persisted. Consequently, when the bookie restarts, it inspects
-     * journal files to restore those entries; data isn't lost.
-     * </p>
-     * <p>
-     * This method is called after flushing entry log files and ledger cache successfully, which is to ensure <i>LastLogMark</i> is pesisted.
-     * </p>
-     * @see #markLog()
-     */
-    public void rollLog() throws NoWritableLedgerDirException {
-        lastLogMark.rollLog();
-    }
-
-    /**
-     * Garbage collect older journals
+     * @throws IOException
      */
-    public void gcJournals() {
-        // list the journals that have been marked
-        List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter());
-        // keep MAX_BACKUP_JOURNALS journal files before marked journal
-        if (logs.size() >= maxBackupJournals) {
-            int maxIdx = logs.size() - maxBackupJournals;
-            for (int i=0; i<maxIdx; i++) {
-                long id = logs.get(i);
-                // make sure the journal id is smaller than marked journal id
-                if (id < lastLogMark.getLastMark().getTxnLogId()) {
-                    File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
-                    if (!journalFile.delete()) {
-                        LOG.warn("Could not delete old journal file {}", journalFile);
+    @Override
+    public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        if (!(checkpoint instanceof LogMarkCheckpoint)) {
+            return; // we didn't create this checkpoint, so dont do anything with it
+        }
+        LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint)checkpoint;
+        LastLogMark mark = lmcheckpoint.mark;
+
+        mark.rollLog(mark);
+        if (compact) {
+            // list the journals that have been marked
+            List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark));
+            // keep MAX_BACKUP_JOURNALS journal files before marked journal
+            if (logs.size() >= maxBackupJournals) {
+                int maxIdx = logs.size() - maxBackupJournals;
+                for (int i=0; i<maxIdx; i++) {
+                    long id = logs.get(i);
+                    // make sure the journal id is smaller than marked journal id
+                    if (id < mark.getCurMark().getLogFileId()) {
+                        File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
+                        if (!journalFile.delete()) {
+                            LOG.warn("Could not delete old journal file {}", journalFile);
+                        }
+                        LOG.info("garbage collected journal " + journalFile.getName());
                     }
-                    LOG.info("garbage collected journal " + journalFile.getName());
                 }
             }
         }
@@ -407,11 +409,11 @@ class Journal extends Thread {
      * @throws IOException
      */
     public void replay(JournalScanner scanner) throws IOException {
-        final long markedLogId = lastLogMark.getTxnLogId();
+        final LogMark markedLog = lastLogMark.getCurMark();
         List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
             @Override
             public boolean accept(long journalId) {
-                if (journalId < markedLogId) {
+                if (journalId < markedLog.getLogFileId()) {
                     return false;
                 }
                 return true;
@@ -419,9 +421,9 @@ class Journal extends Thread {
         });
         // last log mark may be missed due to no sync up before
         // validate filtered log ids only when we have markedLogId
-        if (markedLogId > 0) {
-            if (logs.size() == 0 || logs.get(0) != markedLogId) {
-                throw new IOException("Recovery log " + markedLogId + " is missing");
+        if (markedLog.getLogFileId() > 0) {
+            if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
+                throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
             }
         }
         LOG.debug("Try to relay journal logs : {}", logs);
@@ -430,8 +432,8 @@ class Journal extends Thread {
         // system calls done.
         for(Long id: logs) {
             long logPosition = 0L;
-            if(id == markedLogId) {
-                logPosition = lastLogMark.getTxnLogPosition();
+            if(id == markedLog.getLogFileId()) {
+                logPosition = markedLog.getLogFileOffset();
             }
             scanJournal(id, logPosition, scanner);
         }
@@ -501,7 +503,7 @@ class Journal extends Thread {
                             //logFile.force(false);
                             bc.flush(true);
                             lastFlushPosition = bc.position();
-                            lastLogMark.setLastLogMark(logId, lastFlushPosition);
+                            lastLogMark.setCurLogMark(logId, lastFlushPosition);
                             for (QueueEntry e : toFlush) {
                                 e.cb.writeComplete(BookieException.Code.OK,
                                                    e.ledgerId, e.entryId, null, e.ctx);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java Sat May  4 12:22:10 2013
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 
 /**
@@ -91,11 +92,6 @@ interface LedgerStorage {
     ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
 
     /**
-     * Whether there is data in the storage which needs to be flushed
-     */
-    boolean isFlushRequired();
-
-    /**
      * Flushes all data in the storage. Once this is called,
      * add data written to the LedgerStorage up until this point
      * has been persisted to perminant storage
@@ -103,6 +99,19 @@ interface LedgerStorage {
     void flush() throws IOException;
 
     /**
+     * Ask the ledger storage to sync data until the given <i>checkpoint</i>.
+     * The ledger storage implementation do checkpoint and return the real checkpoint
+     * that it finished. The returned the checkpoint indicates that all entries added
+     * before that point already persist.
+     *
+     * @param checkpoint
+     *          Check Point that {@link Checkpointer} proposed.
+     * @throws IOException
+     * @return the checkpoint that the ledger storage finished.
+     */
+    Checkpoint checkpoint(Checkpoint checkpoint) throws IOException;
+
+    /**
      * Get the JMX management bean for this LedgerStorage
      */
     BKMBeanInfo getJMXBean();

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java?rev=1479085&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Sat May  4 12:22:10 2013
@@ -0,0 +1,83 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Journal stream position
+ */
+class LogMark {
+    long logFileId;
+    long logFileOffset;
+
+    public LogMark() {
+        setLogMark(0, 0);
+    }
+
+    public LogMark(LogMark other) {
+        setLogMark(other.logFileId, other.logFileOffset);
+    }
+
+    public LogMark(long logFileId, long logFileOffset) {
+        setLogMark(logFileId, logFileOffset);
+    }
+
+    public long getLogFileId() {
+        return logFileId;
+    }
+
+    public long getLogFileOffset() {
+        return logFileOffset;
+    }
+
+    public void readLogMark(ByteBuffer bb) {
+        logFileId = bb.getLong();
+        logFileOffset = bb.getLong();
+    }
+
+    public void writeLogMark(ByteBuffer bb) {
+        bb.putLong(logFileId);
+        bb.putLong(logFileOffset);
+    }
+
+    public void setLogMark(long logFileId, long logFileOffset) {
+        this.logFileId = logFileId;
+        this.logFileOffset = logFileOffset;
+    }
+
+    public int compare(LogMark other) {
+        long ret = this.logFileId - other.logFileId;
+        if (ret == 0) {
+            ret = this.logFileOffset - other.logFileOffset;
+        }
+        return (ret < 0)? -1 : ((ret > 0)? 1 : 0);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("LogMark: logFileId - ").append(logFileId)
+                .append(" , logFileOffset - ").append(logFileOffset);
+
+        return sb.toString();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java Sat May  4 12:22:10 2013
@@ -22,6 +22,8 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
 
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+
 /**
  * Accessor class to avoid making Bookie internals public
  */
@@ -30,6 +32,8 @@ public class BookieAccessor {
      * Force a bookie to flush its ledger storage
      */
     public static void forceFlush(Bookie b) throws IOException {
+        Checkpoint cp = b.journal.newCheckpoint();
         b.ledgerStorage.flush();
+        b.journal.checkpointComplete(cp, true);
     }
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Sat May  4 12:22:10 2013
@@ -276,6 +276,8 @@ public class CompactionTest extends Book
         bkc.deleteLedger(lhs[1].getId());
         bkc.deleteLedger(lhs[2].getId());
         LOG.info("Finished deleting the ledgers contains most entries.");
+        // restart bookies again to roll entry log files.
+        restartBookies();
         Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
                    + baseConf.getGcWaitTime());
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Sat May  4 12:22:10 2013
@@ -96,6 +96,8 @@ public class LedgerDeleteTest extends Mu
     public void testLedgerDelete() throws Exception {
         // Write enough ledger entries so that we roll over the initial entryLog (0.log)
         LedgerHandle[] lhs = writeLedgerEntries(3, 1024, 1024);
+        // restart bookies to force rolling entry log files
+        restartBookies();
 
         // Delete all of these ledgers from the BookKeeper client
         for (LedgerHandle lh : lhs) {
@@ -136,7 +138,7 @@ public class LedgerDeleteTest extends Mu
             bkc.deleteLedger(lh.getId());
         }
         LOG.info("Finished deleting all ledgers so waiting for the GC thread to clean up the entryLogs");
-        Thread.sleep(2000);
+        Thread.sleep(2 * baseConf.getGcWaitTime());
 
         /*
          * Verify that the first two entry logs ([0,1].log) have been deleted