You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/04/17 13:16:27 UTC

svn commit: r1327045 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: sijie
Date: Tue Apr 17 11:16:26 2012
New Revision: 1327045

URL: http://svn.apache.org/viewvc?rev=1327045&view=rev
Log:
BOOKKEEPER-218: Provide journal manager to manage journal related operations (sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java   (with props)
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Apr 17 11:16:26 2012
@@ -130,6 +130,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-188: Garbage collection code is in the wrong place (ivank via sijie)
 
+        BOOKKEEPER-218: Provide journal manager to manage journal related operations (sijie)
+
       hedwig-server/
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Apr 17 11:16:26 2012
@@ -21,34 +21,25 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.io.FilenameFilter;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.jmx.BKMBeanRegistry;
@@ -70,21 +61,14 @@ import org.apache.zookeeper.ZooDefs.Ids;
 public class Bookie extends Thread {
     static Logger LOG = LoggerFactory.getLogger(Bookie.class);
 
-    final static long MB = 1024 * 1024L;
-    // max journal file size
-    final long maxJournalSize;
-    // number journal files kept before marked journal
-    final int maxBackupJournals;
-
     final File journalDirectory;
-
     final File ledgerDirectories[];
-
     final ServerConfiguration conf;
 
     final SyncThread syncThread;
     final LedgerManager ledgerManager;
     final LedgerStorage ledgerStorage;
+    final Journal journal;
     final HandleFactory handles;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
@@ -138,6 +122,15 @@ public class Bookie extends Thread {
         }
     }
 
+    // Write Callback do nothing
+    class NopWriteCallback implements WriteCallback {
+        @Override
+        public void writeComplete(int rc, long ledgerId, long entryId,
+                                  InetSocketAddress addr, Object ctx) {
+            LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
+                      new Object[] { entryId, ledgerId, addr, rc });
+        }
+    }
 
     /**
      * SyncThread is a background thread which flushes ledger index pages periodically.
@@ -199,7 +192,8 @@ public class Bookie extends Thread {
                     break;
                 }
 
-                lastLogMark.markLog();
+                // journal mark log
+                journal.markLog();
 
                 boolean flushFailed = false;
                 try {
@@ -212,35 +206,8 @@ public class Bookie extends Thread {
                 // if flush failed, we should not roll last mark, otherwise we would
                 // have some ledgers are not flushed and their journal entries were lost
                 if (!flushFailed) {
-
-                    lastLogMark.rollLog();
-
-                    // list the journals that have been marked
-                    List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
-                        @Override
-                        public boolean accept(long journalId) {
-                            if (journalId < lastLogMark.lastMark.txnLogId) {
-                                return true;
-                            } else {
-                                return false;
-                            }
-                        }
-                    });
-
-                    // keep MAX_BACKUP_JOURNALS journal files before marked journal
-                    if (logs.size() >= maxBackupJournals) {
-                        int maxIdx = logs.size() - maxBackupJournals;
-                        for (int i=0; i<maxIdx; i++) {
-                            long id = logs.get(i);
-                            // make sure the journal id is smaller than marked journal id
-                            if (id < lastLogMark.lastMark.txnLogId) {
-                                File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
-                                journalFile.delete();
-                                LOG.info("garbage collected journal " + journalFile.getName());
-                            }
-                        }
-                    }
-
+                    journal.rollLog();
+                    journal.gcJournals();
                 }
 
                 // clear flushing flag
@@ -374,8 +341,6 @@ public class Bookie extends Thread {
         this.conf = conf;
         this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
         this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
-        this.maxJournalSize = conf.getMaxJournalSize() * MB;
-        this.maxBackupJournals = conf.getMaxBackupJournals();
 
         // instantiate zookeeper client to initialize ledger manager
         this.zk = instantiateZookeeperClient(conf);
@@ -385,80 +350,24 @@ public class Bookie extends Thread {
 
         syncThread = new SyncThread(conf);
         ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
-
         handles = new HandleFactoryImpl(ledgerStorage);
+        // instantiate the journal
+        journal = new Journal(conf);
 
         // replay journals
         readJournal();
     }
 
     private void readJournal() throws IOException, BookieException {
-        lastLogMark.readLog();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Last Log Mark : " + lastLogMark);
-        }
-        final long markedLogId = lastLogMark.txnLogId;
-        List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
+        journal.replay(new JournalScanner() {
             @Override
-            public boolean accept(long journalId) {
-                if (journalId < markedLogId) {
-                    return false;
-                }
-                return true;
-            }
-        });
-        // last log mark may be missed due to no sync up before
-        // validate filtered log ids only when we have markedLogId
-        if (markedLogId > 0) {
-            if (logs.size() == 0 || logs.get(0) != markedLogId) {
-                throw new IOException("Recovery log " + markedLogId + " is missing");
-            }
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Try to relay journal logs : " + logs);
-        }
-        // TODO: When reading in the journal logs that need to be synced, we
-        // should use BufferedChannels instead to minimize the amount of
-        // system calls done.
-        ByteBuffer lenBuff = ByteBuffer.allocate(4);
-        ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
-        for(Long id: logs) {
-            JournalChannel recLog;
-            if(id == markedLogId) {
-                long markedLogPosition = lastLogMark.txnLogPosition;
-                recLog = new JournalChannel(journalDirectory, id, markedLogPosition);
-            } else {
-                recLog = new JournalChannel(journalDirectory, id);
-            }
-
-            while(true) {
-                lenBuff.clear();
-                fullRead(recLog, lenBuff);
-                if (lenBuff.remaining() != 0) {
-                    break;
-                }
-                lenBuff.flip();
-                int len = lenBuff.getInt();
-                if (len == 0) {
-                    break;
-                }
-                recBuff.clear();
-                if (recBuff.remaining() < len) {
-                    recBuff = ByteBuffer.allocate(len);
-                }
-                recBuff.limit(len);
-                if (fullRead(recLog, recBuff) != len) {
-                    // This seems scary, but it just means that this is where we
-                    // left off writing
-                    break;
-                }
-                recBuff.flip();
+            public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
                 long ledgerId = recBuff.getLong();
                 long entryId = recBuff.getLong();
                 try {
                     LOG.debug("Replay journal - ledger id : {}, entry id : {}.", ledgerId, entryId);
                     if (entryId == METAENTRY_ID_LEDGER_KEY) {
-                        if (recLog.getFormatVersion() >= 3) {
+                        if (journalVersion >= 3) {
                             int masterKeyLen = recBuff.getInt();
                             byte[] masterKey = new byte[masterKeyLen];
 
@@ -466,7 +375,7 @@ public class Bookie extends Thread {
                             masterKeyCache.put(ledgerId, masterKey);
                         } else {
                             throw new IOException("Invalid journal. Contains journalKey "
-                                    + " but layout version (" + recLog.getFormatVersion()
+                                    + " but layout version (" + journalVersion
                                     + ") is too old to hold this");
                         }
                     } else {
@@ -481,16 +390,17 @@ public class Bookie extends Thread {
                     }
                 } catch (NoLedgerException nsle) {
                     LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
-                    continue;
+                } catch (BookieException be) {
+                    throw new IOException(be);
                 }
             }
-            recLog.close();
-        }
+        });
     }
 
     synchronized public void start() {
         setDaemon(true);
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
+        // start bookie thread
         super.start();
         syncThread.start();
 
@@ -507,39 +417,6 @@ public class Bookie extends Thread {
         }
     }
 
-    public static interface JournalIdFilter {
-        public boolean accept(long journalId);
-    }
-
-    /**
-     * List all journal ids by a specified journal id filer
-     *
-     * @param journalDir journal dir
-     * @param filter journal id filter
-     * @return list of filtered ids
-     */
-    public static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
-        File logFiles[] = journalDir.listFiles();
-        List<Long> logs = new ArrayList<Long>();
-        for(File f: logFiles) {
-            String name = f.getName();
-            if (!name.endsWith(".txn")) {
-                continue;
-            }
-            String idString = name.split("\\.")[0];
-            long id = Long.parseLong(idString, 16);
-            if (filter != null) {
-                if (filter.accept(id)) {
-                    logs.add(id);
-                }
-            } else {
-                logs.add(id);
-            }
-        }
-        Collections.sort(logs);
-        return logs;
-    }
-
     /**
      * Register jmx with parent
      *
@@ -670,224 +547,27 @@ public class Bookie extends Thread {
         return newZk;
     }
 
-    private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
-        int total = 0;
-        while(bb.remaining() > 0) {
-            int rc = fc.read(bb);
-            if (rc <= 0) {
-                return total;
-            }
-            total += rc;
-        }
-        return total;
-    }
-
-    static class QueueEntry {
-        QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
-                   WriteCallback cb, Object ctx) {
-            this.entry = entry.duplicate();
-            this.cb = cb;
-            this.ctx = ctx;
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-        }
-
-        ByteBuffer entry;
-
-        long ledgerId;
-
-        long entryId;
-
-        WriteCallback cb;
-
-        Object ctx;
-    }
-
-    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
-
-    class LastLogMark {
-        long txnLogId;
-        long txnLogPosition;
-        LastLogMark lastMark;
-        LastLogMark(long logId, long logPosition) {
-            this.txnLogId = logId;
-            this.txnLogPosition = logPosition;
-        }
-        synchronized void setLastLogMark(long logId, long logPosition) {
-            txnLogId = logId;
-            txnLogPosition = logPosition;
-        }
-        synchronized void markLog() {
-            lastMark = new LastLogMark(txnLogId, txnLogPosition);
-        }
-        synchronized void rollLog() {
-            byte buff[] = new byte[16];
-            ByteBuffer bb = ByteBuffer.wrap(buff);
-            // we should record <logId, logPosition> marked in markLog
-            // which is safe since records before lastMark have been
-            // persisted to disk (both index & entry logger)
-            bb.putLong(lastMark.txnLogId);
-            bb.putLong(lastMark.txnLogPosition);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("RollLog to persist last marked log : " + lastMark);
-            }
-            for(File dir: ledgerDirectories) {
-                File file = new File(dir, "lastMark");
-                try {
-                    FileOutputStream fos = new FileOutputStream(file);
-                    fos.write(buff);
-                    fos.getChannel().force(true);
-                    fos.close();
-                } catch (IOException e) {
-                    LOG.error("Problems writing to " + file, e);
-                }
-            }
-        }
-
-        /**
-         * Read last mark from lastMark file.
-         * The last mark should first be max journal log id,
-         * and then max log position in max journal log.
-         */
-        synchronized void readLog() {
-            byte buff[] = new byte[16];
-            ByteBuffer bb = ByteBuffer.wrap(buff);
-            for(File dir: ledgerDirectories) {
-                File file = new File(dir, "lastMark");
-                try {
-                    FileInputStream fis = new FileInputStream(file);
-                    fis.read(buff);
-                    fis.close();
-                    bb.clear();
-                    long i = bb.getLong();
-                    long p = bb.getLong();
-                    if (i > txnLogId) {
-                        txnLogId = i;
-                        if(p > txnLogPosition) {
-                          txnLogPosition = p;
-                        }
-                    }
-                } catch (IOException e) {
-                    LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            
-            sb.append("LastMark: logId - ").append(txnLogId)
-              .append(" , position - ").append(txnLogPosition);
-            
-            return sb.toString();
-        }
-    }
-
-    private LastLogMark lastLogMark = new LastLogMark(0, 0);
-
-    LastLogMark getLastLogMark() {
-        return lastLogMark;
-    }
-
     public boolean isRunning() {
         return running;
     }
 
-    /**
-     * A thread used for persisting journal entries to journal files.
-     * 
-     * <p>
-     * Besides persisting journal entries, it also takes responsibility of
-     * rolling journal files when a journal file reaches journal file size
-     * limitation.
-     * </p>
-     * <p>
-     * During journal rolling, it first closes the writing journal, generates
-     * new journal file using current timestamp, and continue persistence logic.
-     * Those journals will be garbage collected in SyncThread.
-     * </p>
-     */
     @Override
     public void run() {
-        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
-        ByteBuffer lenBuff = ByteBuffer.allocate(4);
+        // bookie thread wait for journal thread
         try {
-            long logId = 0;
-            JournalChannel logFile = null;
-            BufferedChannel bc = null;
-            long nextPrealloc = 0;
-            long lastFlushPosition = 0;
-
-            QueueEntry qe = null;
-            while (true) {
-                // new journal file to write
-                if (null == logFile) {
-                    logId = System.currentTimeMillis();
-                    logFile = new JournalChannel(journalDirectory, logId);
-                    bc = logFile.getBufferedChannel();
-
-                    lastFlushPosition = 0;
-                }
-
-                if (qe == null) {
-                    if (toFlush.isEmpty()) {
-                        qe = queue.take();
-                    } else {
-                        qe = queue.poll();
-                        if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
-                            //logFile.force(false);
-                            bc.flush(true);
-                            lastFlushPosition = bc.position();
-                            lastLogMark.setLastLogMark(logId, lastFlushPosition);
-                            for (QueueEntry e : toFlush) {
-                                e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
-                            }
-                            toFlush.clear();
-
-                            // check whether journal file is over file limit
-                            if (bc.position() > maxJournalSize) {
-                                logFile.close();
-                                logFile = null;
-                                continue;
-                            }
-                        }
-                    }
-                }
-
-                if (isZkExpired) {
-                    LOG.warn("Exiting... zk client has expired.");
-                    break;
-                }
-                if (qe == null) { // no more queue entry
-                    continue;
-                }
-                lenBuff.clear();
-                lenBuff.putInt(qe.entry.remaining());
-                lenBuff.flip();
-                //
-                // we should be doing the following, but then we run out of
-                // direct byte buffers
-                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
-                bc.write(lenBuff);
-                bc.write(qe.entry);
-
-                logFile.preAllocIfNeeded();
-
-                toFlush.add(qe);
-                qe = null;
-            }
-        } catch (Exception e) {
-            // if the bookie thread quits due to shutting down, it is ok
-            if (shuttingdown) {
-                LOG.warn("Bookie thread exits when shutting down", e);
-            } else {
-                // some error found in bookie thread and it quits
-                // following add operations to it would hang unit client timeout
-                // so we should let bookie server exists
-                LOG.error("Exception occurred in bookie thread and it quits : ", e);
-                shutdown(ExitCode.BOOKIE_EXCEPTION);
-            }
+            // start journal
+            journal.start();
+            // wait until journal quits
+            journal.join();
+        } catch (InterruptedException ie) {
+        }
+        // if the journal thread quits due to shutting down, it is ok
+        if (!shuttingdown) {
+            // some error found in journal thread and it quits
+            // following add operations to it would hang unit client timeout
+            // so we should let bookie server exists
+            LOG.error("Journal manager quits unexpectedly.");
+            shutdown(ExitCode.BOOKIE_EXCEPTION);
         }
     }
 
@@ -912,7 +592,8 @@ public class Bookie extends Thread {
 
                 // Shutdown the ZK client
                 if(zk != null) zk.close();
-                this.interrupt();
+                // Shutdown journal
+                journal.shutdown();
                 this.join();
                 syncThread.shutdown();
 
@@ -947,15 +628,7 @@ public class Bookie extends Thread {
             bb.put(masterKey);
             bb.flip();
 
-            queue.add(new QueueEntry(bb,
-                                     ledgerId, METAENTRY_ID_LEDGER_KEY,
-                                     new WriteCallback() {
-                                         public void writeComplete(int rc, long ledgerId, 
-                                                 long entryId, InetSocketAddress addr,
-                                                 Object ctx) {
-                                             // do nothing
-                                         }
-                                     }, null));
+            journal.logAddEntry(bb, new NopWriteCallback(), null);
             masterKeyCache.put(ledgerId, masterKey);
         }
         return l;
@@ -981,7 +654,7 @@ public class Bookie extends Thread {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding " + entryId + "@" + ledgerId);
         }
-        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+        journal.logAddEntry(entry, cb, ctx);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java Tue Apr 17 11:16:26 2012
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.File;
 
-import org.apache.bookkeeper.bookie.Bookie.LastLogMark;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 
 /**
@@ -46,7 +45,7 @@ public class BookieBean implements Booki
 
     @Override
     public int getQueueLength() {
-        return bk.queue.size();
+        return bk.journal.getJournalQueueLength();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java Tue Apr 17 11:16:26 2012
@@ -20,8 +20,6 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.File;
 
-import org.apache.bookkeeper.bookie.Bookie.LastLogMark;
-
 /**
  * Bookie MBean
  */

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1327045&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Tue Apr 17 11:16:26 2012
@@ -0,0 +1,550 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide journal related management.
+ */
+class Journal extends Thread {
+
+    static Logger LOG = LoggerFactory.getLogger(Journal.class);
+
+    /**
+     * Filter to pickup journals
+     */
+    private static interface JournalIdFilter {
+        public boolean accept(long journalId);
+    }
+
+    /**
+     * List all journal ids by a specified journal id filer
+     *
+     * @param journalDir journal dir
+     * @param filter journal id filter
+     * @return list of filtered ids
+     */
+    private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+        File logFiles[] = journalDir.listFiles();
+        List<Long> logs = new ArrayList<Long>();
+        for(File f: logFiles) {
+            String name = f.getName();
+            if (!name.endsWith(".txn")) {
+                continue;
+            }
+            String idString = name.split("\\.")[0];
+            long id = Long.parseLong(idString, 16);
+            if (filter != null) {
+                if (filter.accept(id)) {
+                    logs.add(id);
+                }
+            } else {
+                logs.add(id);
+            }
+        }
+        Collections.sort(logs);
+        return logs;
+    }
+
+    /**
+     * Last Log Mark
+     */
+    class LastLogMark {
+        long txnLogId;
+        long txnLogPosition;
+        LastLogMark lastMark;
+        LastLogMark(long logId, long logPosition) {
+            this.txnLogId = logId;
+            this.txnLogPosition = logPosition;
+        }
+        synchronized void setLastLogMark(long logId, long logPosition) {
+            txnLogId = logId;
+            txnLogPosition = logPosition;
+        }
+        synchronized void markLog() {
+            lastMark = new LastLogMark(txnLogId, txnLogPosition);
+        }
+        synchronized void rollLog() {
+            byte buff[] = new byte[16];
+            ByteBuffer bb = ByteBuffer.wrap(buff);
+            // we should record <logId, logPosition> marked in markLog
+            // which is safe since records before lastMark have been
+            // persisted to disk (both index & entry logger)
+            bb.putLong(lastMark.txnLogId);
+            bb.putLong(lastMark.txnLogPosition);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("RollLog to persist last marked log : " + lastMark);
+            }
+            for(File dir: ledgerDirectories) {
+                File file = new File(dir, "lastMark");
+                try {
+                    FileOutputStream fos = new FileOutputStream(file);
+                    fos.write(buff);
+                    fos.getChannel().force(true);
+                    fos.close();
+                } catch (IOException e) {
+                    LOG.error("Problems writing to " + file, e);
+                }
+            }
+        }
+
+        /**
+         * Read last mark from lastMark file.
+         * The last mark should first be max journal log id,
+         * and then max log position in max journal log.
+         */
+        synchronized void readLog() {
+            byte buff[] = new byte[16];
+            ByteBuffer bb = ByteBuffer.wrap(buff);
+            for(File dir: ledgerDirectories) {
+                File file = new File(dir, "lastMark");
+                try {
+                    FileInputStream fis = new FileInputStream(file);
+                    fis.read(buff);
+                    fis.close();
+                    bb.clear();
+                    long i = bb.getLong();
+                    long p = bb.getLong();
+                    if (i > txnLogId) {
+                        txnLogId = i;
+                        if(p > txnLogPosition) {
+                          txnLogPosition = p;
+                        }
+                    }
+                } catch (IOException e) {
+                    LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("LastMark: logId - ").append(txnLogId)
+              .append(" , position - ").append(txnLogPosition);
+
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Filter to return list of journals for rolling
+     */
+    private class JournalRollingFilter implements JournalIdFilter {
+        @Override
+        public boolean accept(long journalId) {
+            if (journalId < lastLogMark.lastMark.txnLogId) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    /**
+     * Scanner used to scan a journal
+     */
+    public static interface JournalScanner {
+        /**
+         * Process a journal entry.
+         *
+         * @param journalVersion
+         *          Journal Version
+         * @param offset
+         *          File offset of the journal entry
+         * @param entry
+         *          Journal Entry
+         * @throws IOException
+         */
+        public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException;
+    }
+
+    /**
+     * Journal Entry to Record
+     */
+    private static class QueueEntry {
+        QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
+                   WriteCallback cb, Object ctx) {
+            this.entry = entry.duplicate();
+            this.cb = cb;
+            this.ctx = ctx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        ByteBuffer entry;
+
+        long ledgerId;
+
+        long entryId;
+
+        WriteCallback cb;
+
+        Object ctx;
+    }
+
+    final static long MB = 1024 * 1024L;
+    // max journal file size
+    final long maxJournalSize;
+    // number journal files kept before marked journal
+    final int maxBackupJournals;
+
+    final File journalDirectory;
+    final File ledgerDirectories[];
+    final ServerConfiguration conf;
+
+    private LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+    // journal entry queue to commit
+    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+
+    volatile boolean running = true;
+
+    public Journal(ServerConfiguration conf) {
+        super("BookieJournal-" + conf.getBookiePort());
+        this.conf = conf;
+        this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
+        this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+        this.maxJournalSize = conf.getMaxJournalSize() * MB;
+        this.maxBackupJournals = conf.getMaxBackupJournals();
+
+        // read last log mark
+        lastLogMark.readLog();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Last Log Mark : " + lastLogMark);
+        }
+    }
+
+    LastLogMark getLastLogMark() {
+        return lastLogMark;
+    }
+
+    /**
+     * Records a <i>LastLogMark</i> in memory.
+     *
+     * <p>
+     * The <i>LastLogMark</i> contains two parts: first one is <i>txnLogId</i>
+     * (file id of a journal) and the second one is <i>txnLogPos</i> (offset in
+     *  a journal). The <i>LastLogMark</i> indicates that those entries before
+     * it have been persisted to both index and entry log files.
+     * </p>
+     *
+     * <p>
+     * This method is called before flushing entry log files and ledger cache.
+     * </p>
+     */
+    public void markLog() {
+        lastLogMark.markLog();
+    }
+
+    /**
+     * Persists the <i>LastLogMark</i> marked by #markLog() to disk.
+     *
+     * <p>
+     * This action means entries added before <i>LastLogMark</i> whose entry data
+     * and index pages were already persisted to disk. It is the time to safely
+     * remove journal files created earlier than <i>LastLogMark.txnLogId</i>.
+     * </p>
+     * <p>
+     * If the bookie has crashed before persisting <i>LastLogMark</i> to disk,
+     * it still has journal files contains entries for which index pages may not
+     * have been persisted. Consequently, when the bookie restarts, it inspects
+     * journal files to restore those entries; data isn't lost.
+     * </p>
+     * <p>
+     * This method is called after flushing entry log files and ledger cache successfully, which is to ensure <i>LastLogMark</i> is pesisted.
+     * </p>
+     * @see #markLog()
+     */
+    public void rollLog() {
+        lastLogMark.rollLog();
+    }
+
+    /**
+     * Garbage collect older journals
+     */
+    public void gcJournals() {
+        // list the journals that have been marked
+        List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter());
+        // keep MAX_BACKUP_JOURNALS journal files before marked journal
+        if (logs.size() >= maxBackupJournals) {
+            int maxIdx = logs.size() - maxBackupJournals;
+            for (int i=0; i<maxIdx; i++) {
+                long id = logs.get(i);
+                // make sure the journal id is smaller than marked journal id
+                if (id < lastLogMark.lastMark.txnLogId) {
+                    File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
+                    journalFile.delete();
+                    LOG.info("garbage collected journal " + journalFile.getName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Scan the journal
+     *
+     * @param journalId
+     *          Journal Log Id
+     * @param journalPos
+     *          Offset to start scanning
+     * @param scanner
+     *          Scanner to handle entries
+     * @throws IOException
+     */
+    public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
+        throws IOException {
+        JournalChannel recLog;
+        if (journalPos <= 0) {
+            recLog = new JournalChannel(journalDirectory, journalId);
+        } else {
+            recLog = new JournalChannel(journalDirectory, journalId, journalPos);
+        }
+        int journalVersion = recLog.getFormatVersion();
+        try {
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
+            while(true) {
+                // entry start offset
+                long offset = recLog.fc.position();
+                // start reading entry
+                lenBuff.clear();
+                fullRead(recLog, lenBuff);
+                if (lenBuff.remaining() != 0) {
+                    break;
+                }
+                lenBuff.flip();
+                int len = lenBuff.getInt();
+                if (len == 0) {
+                    break;
+                }
+                recBuff.clear();
+                if (recBuff.remaining() < len) {
+                    recBuff = ByteBuffer.allocate(len);
+                }
+                recBuff.limit(len);
+                if (fullRead(recLog, recBuff) != len) {
+                    // This seems scary, but it just means that this is where we
+                    // left off writing
+                    break;
+                }
+                recBuff.flip();
+                scanner.process(journalVersion, offset, recBuff);
+            }
+        } finally {
+            recLog.close();
+        }
+    }
+
+    /**
+     * Replay journal files
+     *
+     * @param scanner
+     *          Scanner to process replayed entries.
+     * @throws IOException
+     */
+    public void replay(JournalScanner scanner) throws IOException {
+        final long markedLogId = lastLogMark.txnLogId;
+        List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
+            @Override
+            public boolean accept(long journalId) {
+                if (journalId < markedLogId) {
+                    return false;
+                }
+                return true;
+            }
+        });
+        // last log mark may be missed due to no sync up before
+        // validate filtered log ids only when we have markedLogId
+        if (markedLogId > 0) {
+            if (logs.size() == 0 || logs.get(0) != markedLogId) {
+                throw new IOException("Recovery log " + markedLogId + " is missing");
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Try to relay journal logs : " + logs);
+        }
+        // TODO: When reading in the journal logs that need to be synced, we
+        // should use BufferedChannels instead to minimize the amount of
+        // system calls done.
+        for(Long id: logs) {
+            long logPosition = 0L;
+            if(id == markedLogId) {
+                logPosition = lastLogMark.txnLogPosition;
+            }
+            scanJournal(id, logPosition, scanner);
+        }
+    }
+
+    /**
+     * record an add entry operation in journal
+     */
+    public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
+        long ledgerId = entry.getLong();
+        long entryId = entry.getLong();
+        entry.rewind();
+        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+    }
+
+    /**
+     * Get the length of journal entries queue.
+     *
+     * @return length of journal entry queue.
+     */
+    public int getJournalQueueLength() {
+        return queue.size();
+    }
+
+    /**
+     * A thread used for persisting journal entries to journal files.
+     *
+     * <p>
+     * Besides persisting journal entries, it also takes responsibility of
+     * rolling journal files when a journal file reaches journal file size
+     * limitation.
+     * </p>
+     * <p>
+     * During journal rolling, it first closes the writing journal, generates
+     * new journal file using current timestamp, and continue persistence logic.
+     * Those journals will be garbage collected in SyncThread.
+     * </p>
+     * @see Bookie#SyncThread
+     */
+    @Override
+    public void run() {
+        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+        ByteBuffer lenBuff = ByteBuffer.allocate(4);
+        try {
+            long logId = 0;
+            JournalChannel logFile = null;
+            BufferedChannel bc = null;
+            long nextPrealloc = 0;
+            long lastFlushPosition = 0;
+
+            QueueEntry qe = null;
+            while (true) {
+                // new journal file to write
+                if (null == logFile) {
+                    logId = System.currentTimeMillis();
+                    logFile = new JournalChannel(journalDirectory, logId);
+                    bc = logFile.getBufferedChannel();
+
+                    lastFlushPosition = 0;
+                }
+
+                if (qe == null) {
+                    if (toFlush.isEmpty()) {
+                        qe = queue.take();
+                    } else {
+                        qe = queue.poll();
+                        if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
+                            //logFile.force(false);
+                            bc.flush(true);
+                            lastFlushPosition = bc.position();
+                            lastLogMark.setLastLogMark(logId, lastFlushPosition);
+                            for (QueueEntry e : toFlush) {
+                                e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
+                            }
+                            toFlush.clear();
+
+                            // check whether journal file is over file limit
+                            if (bc.position() > maxJournalSize) {
+                                logFile.close();
+                                logFile = null;
+                                continue;
+                            }
+                        }
+                    }
+                }
+
+                if (!running) {
+                    LOG.info("Journal Manager is asked to shut down, quit.");
+                    break;
+                }
+
+                if (qe == null) { // no more queue entry
+                    continue;
+                }
+                lenBuff.clear();
+                lenBuff.putInt(qe.entry.remaining());
+                lenBuff.flip();
+                //
+                // we should be doing the following, but then we run out of
+                // direct byte buffers
+                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+                bc.write(lenBuff);
+                bc.write(qe.entry);
+
+                logFile.preAllocIfNeeded();
+
+                toFlush.add(qe);
+                qe = null;
+            }
+        } catch (Exception e) {
+            LOG.warn("Journal exits when shutting down", e);
+        }
+    }
+
+    /**
+     * Shuts down the journal.
+     */
+    public synchronized void shutdown() {
+        try {
+            if (!running) {
+                return;
+            }
+            running = false;
+            this.interrupt();
+            this.join();
+        } catch (InterruptedException ie) {
+            LOG.warn("Interrupted during shutting down journal : ", ie);
+        }
+    }
+
+    private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
+        int total = 0;
+        while(bb.remaining() > 0) {
+            int rc = fc.read(bb);
+            if (rc <= 0) {
+                return total;
+            }
+            total += rc;
+        }
+        return total;
+    }
+}

Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Tue Apr 17 11:16:26 2012
@@ -224,7 +224,7 @@ public abstract class BookKeeperClusterT
     public void sleepBookie(InetSocketAddress addr, final int seconds,
                             final CountDownLatch l)
             throws InterruptedException, IOException {
-        final String name = "Bookie-" + addr.getPort();
+        final String name = "BookieJournal-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
         for (final Thread t : allthreads) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java Tue Apr 17 11:16:26 2012
@@ -26,7 +26,6 @@ import java.util.List;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.JournalIdFilter;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -171,13 +170,15 @@ public class BookieJournalRollingTest ex
 
         // verify that we only keep at most journal files 
         for (File journalDir : tmpDirs) {
-            List<Long> logs = Bookie.listJournalIds(journalDir, new JournalIdFilter() {
-                @Override
-                public boolean accept(long journalId) {
-                    return true;
+            File[] journals = journalDir.listFiles();
+            int numJournals = 0;
+            for (File f : journals) {
+                if (!f.getName().endsWith(".txn")) {
+                    continue;
                 }
-            });
-            assertTrue(logs.size() <= 2);
+                ++numJournals;
+            }
+            assertTrue(numJournals <= 2);
         }
 
         // restart bookies