You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/04/17 13:16:27 UTC
svn commit: r1327045 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: sijie
Date: Tue Apr 17 11:16:26 2012
New Revision: 1327045
URL: http://svn.apache.org/viewvc?rev=1327045&view=rev
Log:
BOOKKEEPER-218: Provide journal manager to manage journal related operations (sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (with props)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Apr 17 11:16:26 2012
@@ -130,6 +130,8 @@ Trunk (unreleased changes)
BOOKKEEPER-188: Garbage collection code is in the wrong place (ivank via sijie)
+ BOOKKEEPER-218: Provide journal manager to manage journal related operations (sijie)
+
hedwig-server/
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Apr 17 11:16:26 2012
@@ -21,34 +21,25 @@
package org.apache.bookkeeper.bookie;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.io.FilenameFilter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
@@ -70,21 +61,14 @@ import org.apache.zookeeper.ZooDefs.Ids;
public class Bookie extends Thread {
static Logger LOG = LoggerFactory.getLogger(Bookie.class);
- final static long MB = 1024 * 1024L;
- // max journal file size
- final long maxJournalSize;
- // number journal files kept before marked journal
- final int maxBackupJournals;
-
final File journalDirectory;
-
final File ledgerDirectories[];
-
final ServerConfiguration conf;
final SyncThread syncThread;
final LedgerManager ledgerManager;
final LedgerStorage ledgerStorage;
+ final Journal journal;
final HandleFactory handles;
static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
@@ -138,6 +122,15 @@ public class Bookie extends Thread {
}
}
+ // Write Callback do nothing
+ class NopWriteCallback implements WriteCallback {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ InetSocketAddress addr, Object ctx) {
+ LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
+ new Object[] { entryId, ledgerId, addr, rc });
+ }
+ }
/**
* SyncThread is a background thread which flushes ledger index pages periodically.
@@ -199,7 +192,8 @@ public class Bookie extends Thread {
break;
}
- lastLogMark.markLog();
+ // journal mark log
+ journal.markLog();
boolean flushFailed = false;
try {
@@ -212,35 +206,8 @@ public class Bookie extends Thread {
// if flush failed, we should not roll last mark, otherwise we would
// have some ledgers are not flushed and their journal entries were lost
if (!flushFailed) {
-
- lastLogMark.rollLog();
-
- // list the journals that have been marked
- List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
- @Override
- public boolean accept(long journalId) {
- if (journalId < lastLogMark.lastMark.txnLogId) {
- return true;
- } else {
- return false;
- }
- }
- });
-
- // keep MAX_BACKUP_JOURNALS journal files before marked journal
- if (logs.size() >= maxBackupJournals) {
- int maxIdx = logs.size() - maxBackupJournals;
- for (int i=0; i<maxIdx; i++) {
- long id = logs.get(i);
- // make sure the journal id is smaller than marked journal id
- if (id < lastLogMark.lastMark.txnLogId) {
- File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
- journalFile.delete();
- LOG.info("garbage collected journal " + journalFile.getName());
- }
- }
- }
-
+ journal.rollLog();
+ journal.gcJournals();
}
// clear flushing flag
@@ -374,8 +341,6 @@ public class Bookie extends Thread {
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
this.ledgerDirectories = getCurrentDirectories(conf.getLedgerDirs());
- this.maxJournalSize = conf.getMaxJournalSize() * MB;
- this.maxBackupJournals = conf.getMaxBackupJournals();
// instantiate zookeeper client to initialize ledger manager
this.zk = instantiateZookeeperClient(conf);
@@ -385,80 +350,24 @@ public class Bookie extends Thread {
syncThread = new SyncThread(conf);
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
-
handles = new HandleFactoryImpl(ledgerStorage);
+ // instantiate the journal
+ journal = new Journal(conf);
// replay journals
readJournal();
}
private void readJournal() throws IOException, BookieException {
- lastLogMark.readLog();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Last Log Mark : " + lastLogMark);
- }
- final long markedLogId = lastLogMark.txnLogId;
- List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
+ journal.replay(new JournalScanner() {
@Override
- public boolean accept(long journalId) {
- if (journalId < markedLogId) {
- return false;
- }
- return true;
- }
- });
- // last log mark may be missed due to no sync up before
- // validate filtered log ids only when we have markedLogId
- if (markedLogId > 0) {
- if (logs.size() == 0 || logs.get(0) != markedLogId) {
- throw new IOException("Recovery log " + markedLogId + " is missing");
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try to relay journal logs : " + logs);
- }
- // TODO: When reading in the journal logs that need to be synced, we
- // should use BufferedChannels instead to minimize the amount of
- // system calls done.
- ByteBuffer lenBuff = ByteBuffer.allocate(4);
- ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
- for(Long id: logs) {
- JournalChannel recLog;
- if(id == markedLogId) {
- long markedLogPosition = lastLogMark.txnLogPosition;
- recLog = new JournalChannel(journalDirectory, id, markedLogPosition);
- } else {
- recLog = new JournalChannel(journalDirectory, id);
- }
-
- while(true) {
- lenBuff.clear();
- fullRead(recLog, lenBuff);
- if (lenBuff.remaining() != 0) {
- break;
- }
- lenBuff.flip();
- int len = lenBuff.getInt();
- if (len == 0) {
- break;
- }
- recBuff.clear();
- if (recBuff.remaining() < len) {
- recBuff = ByteBuffer.allocate(len);
- }
- recBuff.limit(len);
- if (fullRead(recLog, recBuff) != len) {
- // This seems scary, but it just means that this is where we
- // left off writing
- break;
- }
- recBuff.flip();
+ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
long ledgerId = recBuff.getLong();
long entryId = recBuff.getLong();
try {
LOG.debug("Replay journal - ledger id : {}, entry id : {}.", ledgerId, entryId);
if (entryId == METAENTRY_ID_LEDGER_KEY) {
- if (recLog.getFormatVersion() >= 3) {
+ if (journalVersion >= 3) {
int masterKeyLen = recBuff.getInt();
byte[] masterKey = new byte[masterKeyLen];
@@ -466,7 +375,7 @@ public class Bookie extends Thread {
masterKeyCache.put(ledgerId, masterKey);
} else {
throw new IOException("Invalid journal. Contains journalKey "
- + " but layout version (" + recLog.getFormatVersion()
+ + " but layout version (" + journalVersion
+ ") is too old to hold this");
}
} else {
@@ -481,16 +390,17 @@ public class Bookie extends Thread {
}
} catch (NoLedgerException nsle) {
LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
- continue;
+ } catch (BookieException be) {
+ throw new IOException(be);
}
}
- recLog.close();
- }
+ });
}
synchronized public void start() {
setDaemon(true);
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
+ // start bookie thread
super.start();
syncThread.start();
@@ -507,39 +417,6 @@ public class Bookie extends Thread {
}
}
- public static interface JournalIdFilter {
- public boolean accept(long journalId);
- }
-
- /**
- * List all journal ids by a specified journal id filer
- *
- * @param journalDir journal dir
- * @param filter journal id filter
- * @return list of filtered ids
- */
- public static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
- File logFiles[] = journalDir.listFiles();
- List<Long> logs = new ArrayList<Long>();
- for(File f: logFiles) {
- String name = f.getName();
- if (!name.endsWith(".txn")) {
- continue;
- }
- String idString = name.split("\\.")[0];
- long id = Long.parseLong(idString, 16);
- if (filter != null) {
- if (filter.accept(id)) {
- logs.add(id);
- }
- } else {
- logs.add(id);
- }
- }
- Collections.sort(logs);
- return logs;
- }
-
/**
* Register jmx with parent
*
@@ -670,224 +547,27 @@ public class Bookie extends Thread {
return newZk;
}
- private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
- int total = 0;
- while(bb.remaining() > 0) {
- int rc = fc.read(bb);
- if (rc <= 0) {
- return total;
- }
- total += rc;
- }
- return total;
- }
-
- static class QueueEntry {
- QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
- WriteCallback cb, Object ctx) {
- this.entry = entry.duplicate();
- this.cb = cb;
- this.ctx = ctx;
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- }
-
- ByteBuffer entry;
-
- long ledgerId;
-
- long entryId;
-
- WriteCallback cb;
-
- Object ctx;
- }
-
- LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
-
- class LastLogMark {
- long txnLogId;
- long txnLogPosition;
- LastLogMark lastMark;
- LastLogMark(long logId, long logPosition) {
- this.txnLogId = logId;
- this.txnLogPosition = logPosition;
- }
- synchronized void setLastLogMark(long logId, long logPosition) {
- txnLogId = logId;
- txnLogPosition = logPosition;
- }
- synchronized void markLog() {
- lastMark = new LastLogMark(txnLogId, txnLogPosition);
- }
- synchronized void rollLog() {
- byte buff[] = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(buff);
- // we should record <logId, logPosition> marked in markLog
- // which is safe since records before lastMark have been
- // persisted to disk (both index & entry logger)
- bb.putLong(lastMark.txnLogId);
- bb.putLong(lastMark.txnLogPosition);
- if (LOG.isDebugEnabled()) {
- LOG.debug("RollLog to persist last marked log : " + lastMark);
- }
- for(File dir: ledgerDirectories) {
- File file = new File(dir, "lastMark");
- try {
- FileOutputStream fos = new FileOutputStream(file);
- fos.write(buff);
- fos.getChannel().force(true);
- fos.close();
- } catch (IOException e) {
- LOG.error("Problems writing to " + file, e);
- }
- }
- }
-
- /**
- * Read last mark from lastMark file.
- * The last mark should first be max journal log id,
- * and then max log position in max journal log.
- */
- synchronized void readLog() {
- byte buff[] = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(buff);
- for(File dir: ledgerDirectories) {
- File file = new File(dir, "lastMark");
- try {
- FileInputStream fis = new FileInputStream(file);
- fis.read(buff);
- fis.close();
- bb.clear();
- long i = bb.getLong();
- long p = bb.getLong();
- if (i > txnLogId) {
- txnLogId = i;
- if(p > txnLogPosition) {
- txnLogPosition = p;
- }
- }
- } catch (IOException e) {
- LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
-
- sb.append("LastMark: logId - ").append(txnLogId)
- .append(" , position - ").append(txnLogPosition);
-
- return sb.toString();
- }
- }
-
- private LastLogMark lastLogMark = new LastLogMark(0, 0);
-
- LastLogMark getLastLogMark() {
- return lastLogMark;
- }
-
public boolean isRunning() {
return running;
}
- /**
- * A thread used for persisting journal entries to journal files.
- *
- * <p>
- * Besides persisting journal entries, it also takes responsibility of
- * rolling journal files when a journal file reaches journal file size
- * limitation.
- * </p>
- * <p>
- * During journal rolling, it first closes the writing journal, generates
- * new journal file using current timestamp, and continue persistence logic.
- * Those journals will be garbage collected in SyncThread.
- * </p>
- */
@Override
public void run() {
- LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
- ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ // bookie thread wait for journal thread
try {
- long logId = 0;
- JournalChannel logFile = null;
- BufferedChannel bc = null;
- long nextPrealloc = 0;
- long lastFlushPosition = 0;
-
- QueueEntry qe = null;
- while (true) {
- // new journal file to write
- if (null == logFile) {
- logId = System.currentTimeMillis();
- logFile = new JournalChannel(journalDirectory, logId);
- bc = logFile.getBufferedChannel();
-
- lastFlushPosition = 0;
- }
-
- if (qe == null) {
- if (toFlush.isEmpty()) {
- qe = queue.take();
- } else {
- qe = queue.poll();
- if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
- //logFile.force(false);
- bc.flush(true);
- lastFlushPosition = bc.position();
- lastLogMark.setLastLogMark(logId, lastFlushPosition);
- for (QueueEntry e : toFlush) {
- e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
- }
- toFlush.clear();
-
- // check whether journal file is over file limit
- if (bc.position() > maxJournalSize) {
- logFile.close();
- logFile = null;
- continue;
- }
- }
- }
- }
-
- if (isZkExpired) {
- LOG.warn("Exiting... zk client has expired.");
- break;
- }
- if (qe == null) { // no more queue entry
- continue;
- }
- lenBuff.clear();
- lenBuff.putInt(qe.entry.remaining());
- lenBuff.flip();
- //
- // we should be doing the following, but then we run out of
- // direct byte buffers
- // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
- bc.write(lenBuff);
- bc.write(qe.entry);
-
- logFile.preAllocIfNeeded();
-
- toFlush.add(qe);
- qe = null;
- }
- } catch (Exception e) {
- // if the bookie thread quits due to shutting down, it is ok
- if (shuttingdown) {
- LOG.warn("Bookie thread exits when shutting down", e);
- } else {
- // some error found in bookie thread and it quits
- // following add operations to it would hang unit client timeout
- // so we should let bookie server exists
- LOG.error("Exception occurred in bookie thread and it quits : ", e);
- shutdown(ExitCode.BOOKIE_EXCEPTION);
- }
+ // start journal
+ journal.start();
+ // wait until journal quits
+ journal.join();
+ } catch (InterruptedException ie) {
+ }
+ // if the journal thread quits due to shutting down, it is ok
+ if (!shuttingdown) {
+ // some error found in journal thread and it quits
+ // following add operations to it would hang unit client timeout
+ // so we should let bookie server exists
+ LOG.error("Journal manager quits unexpectedly.");
+ shutdown(ExitCode.BOOKIE_EXCEPTION);
}
}
@@ -912,7 +592,8 @@ public class Bookie extends Thread {
// Shutdown the ZK client
if(zk != null) zk.close();
- this.interrupt();
+ // Shutdown journal
+ journal.shutdown();
this.join();
syncThread.shutdown();
@@ -947,15 +628,7 @@ public class Bookie extends Thread {
bb.put(masterKey);
bb.flip();
- queue.add(new QueueEntry(bb,
- ledgerId, METAENTRY_ID_LEDGER_KEY,
- new WriteCallback() {
- public void writeComplete(int rc, long ledgerId,
- long entryId, InetSocketAddress addr,
- Object ctx) {
- // do nothing
- }
- }, null));
+ journal.logAddEntry(bb, new NopWriteCallback(), null);
masterKeyCache.put(ledgerId, masterKey);
}
return l;
@@ -981,7 +654,7 @@ public class Bookie extends Thread {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding " + entryId + "@" + ledgerId);
}
- queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+ journal.logAddEntry(entry, cb, ctx);
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java Tue Apr 17 11:16:26 2012
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
-import org.apache.bookkeeper.bookie.Bookie.LastLogMark;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
/**
@@ -46,7 +45,7 @@ public class BookieBean implements Booki
@Override
public int getQueueLength() {
- return bk.queue.size();
+ return bk.journal.getJournalQueueLength();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieMXBean.java Tue Apr 17 11:16:26 2012
@@ -20,8 +20,6 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
-import org.apache.bookkeeper.bookie.Bookie.LastLogMark;
-
/**
* Bookie MBean
*/
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1327045&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Tue Apr 17 11:16:26 2012
@@ -0,0 +1,550 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide journal related management.
+ */
+class Journal extends Thread {
+
+ static Logger LOG = LoggerFactory.getLogger(Journal.class);
+
+ /**
+ * Filter to pickup journals
+ */
+ private static interface JournalIdFilter {
+ public boolean accept(long journalId);
+ }
+
+ /**
+ * List all journal ids by a specified journal id filer
+ *
+ * @param journalDir journal dir
+ * @param filter journal id filter
+ * @return list of filtered ids
+ */
+ private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+ File logFiles[] = journalDir.listFiles();
+ List<Long> logs = new ArrayList<Long>();
+ for(File f: logFiles) {
+ String name = f.getName();
+ if (!name.endsWith(".txn")) {
+ continue;
+ }
+ String idString = name.split("\\.")[0];
+ long id = Long.parseLong(idString, 16);
+ if (filter != null) {
+ if (filter.accept(id)) {
+ logs.add(id);
+ }
+ } else {
+ logs.add(id);
+ }
+ }
+ Collections.sort(logs);
+ return logs;
+ }
+
+ /**
+ * Last Log Mark
+ */
+ class LastLogMark {
+ long txnLogId;
+ long txnLogPosition;
+ LastLogMark lastMark;
+ LastLogMark(long logId, long logPosition) {
+ this.txnLogId = logId;
+ this.txnLogPosition = logPosition;
+ }
+ synchronized void setLastLogMark(long logId, long logPosition) {
+ txnLogId = logId;
+ txnLogPosition = logPosition;
+ }
+ synchronized void markLog() {
+ lastMark = new LastLogMark(txnLogId, txnLogPosition);
+ }
+ synchronized void rollLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ // we should record <logId, logPosition> marked in markLog
+ // which is safe since records before lastMark have been
+ // persisted to disk (both index & entry logger)
+ bb.putLong(lastMark.txnLogId);
+ bb.putLong(lastMark.txnLogPosition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RollLog to persist last marked log : " + lastMark);
+ }
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileOutputStream fos = new FileOutputStream(file);
+ fos.write(buff);
+ fos.getChannel().force(true);
+ fos.close();
+ } catch (IOException e) {
+ LOG.error("Problems writing to " + file, e);
+ }
+ }
+ }
+
+ /**
+ * Read last mark from lastMark file.
+ * The last mark should first be max journal log id,
+ * and then max log position in max journal log.
+ */
+ synchronized void readLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileInputStream fis = new FileInputStream(file);
+ fis.read(buff);
+ fis.close();
+ bb.clear();
+ long i = bb.getLong();
+ long p = bb.getLong();
+ if (i > txnLogId) {
+ txnLogId = i;
+ if(p > txnLogPosition) {
+ txnLogPosition = p;
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("LastMark: logId - ").append(txnLogId)
+ .append(" , position - ").append(txnLogPosition);
+
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Filter to return list of journals for rolling
+ */
+ private class JournalRollingFilter implements JournalIdFilter {
+ @Override
+ public boolean accept(long journalId) {
+ if (journalId < lastLogMark.lastMark.txnLogId) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Scanner used to scan a journal
+ */
+ public static interface JournalScanner {
+ /**
+ * Process a journal entry.
+ *
+ * @param journalVersion
+ * Journal Version
+ * @param offset
+ * File offset of the journal entry
+ * @param entry
+ * Journal Entry
+ * @throws IOException
+ */
+ public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException;
+ }
+
+ /**
+ * Journal Entry to Record
+ */
+ private static class QueueEntry {
+ QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
+ WriteCallback cb, Object ctx) {
+ this.entry = entry.duplicate();
+ this.cb = cb;
+ this.ctx = ctx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ ByteBuffer entry;
+
+ long ledgerId;
+
+ long entryId;
+
+ WriteCallback cb;
+
+ Object ctx;
+ }
+
+ final static long MB = 1024 * 1024L;
+ // max journal file size
+ final long maxJournalSize;
+ // number journal files kept before marked journal
+ final int maxBackupJournals;
+
+ final File journalDirectory;
+ final File ledgerDirectories[];
+ final ServerConfiguration conf;
+
+ private LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+ // journal entry queue to commit
+ LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+
+ volatile boolean running = true;
+
+ public Journal(ServerConfiguration conf) {
+ super("BookieJournal-" + conf.getBookiePort());
+ this.conf = conf;
+ this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir());
+ this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+ this.maxJournalSize = conf.getMaxJournalSize() * MB;
+ this.maxBackupJournals = conf.getMaxBackupJournals();
+
+ // read last log mark
+ lastLogMark.readLog();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Last Log Mark : " + lastLogMark);
+ }
+ }
+
+ LastLogMark getLastLogMark() {
+ return lastLogMark;
+ }
+
+ /**
+ * Records a <i>LastLogMark</i> in memory.
+ *
+ * <p>
+ * The <i>LastLogMark</i> contains two parts: first one is <i>txnLogId</i>
+ * (file id of a journal) and the second one is <i>txnLogPos</i> (offset in
+ * a journal). The <i>LastLogMark</i> indicates that those entries before
+ * it have been persisted to both index and entry log files.
+ * </p>
+ *
+ * <p>
+ * This method is called before flushing entry log files and ledger cache.
+ * </p>
+ */
+ public void markLog() {
+ lastLogMark.markLog();
+ }
+
+ /**
+ * Persists the <i>LastLogMark</i> marked by #markLog() to disk.
+ *
+ * <p>
+ * This action means entries added before <i>LastLogMark</i> whose entry data
+ * and index pages were already persisted to disk. It is the time to safely
+ * remove journal files created earlier than <i>LastLogMark.txnLogId</i>.
+ * </p>
+ * <p>
+ * If the bookie has crashed before persisting <i>LastLogMark</i> to disk,
+ * it still has journal files contains entries for which index pages may not
+ * have been persisted. Consequently, when the bookie restarts, it inspects
+ * journal files to restore those entries; data isn't lost.
+ * </p>
+ * <p>
+ * This method is called after flushing entry log files and ledger cache successfully, which is to ensure <i>LastLogMark</i> is pesisted.
+ * </p>
+ * @see #markLog()
+ */
+ public void rollLog() {
+ lastLogMark.rollLog();
+ }
+
+ /**
+ * Garbage collect older journals
+ */
+ public void gcJournals() {
+ // list the journals that have been marked
+ List<Long> logs = listJournalIds(journalDirectory, new JournalRollingFilter());
+ // keep MAX_BACKUP_JOURNALS journal files before marked journal
+ if (logs.size() >= maxBackupJournals) {
+ int maxIdx = logs.size() - maxBackupJournals;
+ for (int i=0; i<maxIdx; i++) {
+ long id = logs.get(i);
+ // make sure the journal id is smaller than marked journal id
+ if (id < lastLogMark.lastMark.txnLogId) {
+ File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
+ journalFile.delete();
+ LOG.info("garbage collected journal " + journalFile.getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Scan the journal
+ *
+ * @param journalId
+ * Journal Log Id
+ * @param journalPos
+ * Offset to start scanning
+ * @param scanner
+ * Scanner to handle entries
+ * @throws IOException
+ */
+ public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
+ throws IOException {
+ JournalChannel recLog;
+ if (journalPos <= 0) {
+ recLog = new JournalChannel(journalDirectory, journalId);
+ } else {
+ recLog = new JournalChannel(journalDirectory, journalId, journalPos);
+ }
+ int journalVersion = recLog.getFormatVersion();
+ try {
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
+ while(true) {
+ // entry start offset
+ long offset = recLog.fc.position();
+ // start reading entry
+ lenBuff.clear();
+ fullRead(recLog, lenBuff);
+ if (lenBuff.remaining() != 0) {
+ break;
+ }
+ lenBuff.flip();
+ int len = lenBuff.getInt();
+ if (len == 0) {
+ break;
+ }
+ recBuff.clear();
+ if (recBuff.remaining() < len) {
+ recBuff = ByteBuffer.allocate(len);
+ }
+ recBuff.limit(len);
+ if (fullRead(recLog, recBuff) != len) {
+ // This seems scary, but it just means that this is where we
+ // left off writing
+ break;
+ }
+ recBuff.flip();
+ scanner.process(journalVersion, offset, recBuff);
+ }
+ } finally {
+ recLog.close();
+ }
+ }
+
+ /**
+ * Replay journal files
+ *
+ * @param scanner
+ * Scanner to process replayed entries.
+ * @throws IOException
+ */
+ public void replay(JournalScanner scanner) throws IOException {
+ final long markedLogId = lastLogMark.txnLogId;
+ List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
+ @Override
+ public boolean accept(long journalId) {
+ if (journalId < markedLogId) {
+ return false;
+ }
+ return true;
+ }
+ });
+ // last log mark may be missed due to no sync up before
+ // validate filtered log ids only when we have markedLogId
+ if (markedLogId > 0) {
+ if (logs.size() == 0 || logs.get(0) != markedLogId) {
+ throw new IOException("Recovery log " + markedLogId + " is missing");
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to relay journal logs : " + logs);
+ }
+ // TODO: When reading in the journal logs that need to be synced, we
+ // should use BufferedChannels instead to minimize the amount of
+ // system calls done.
+ for(Long id: logs) {
+ long logPosition = 0L;
+ if(id == markedLogId) {
+ logPosition = lastLogMark.txnLogPosition;
+ }
+ scanJournal(id, logPosition, scanner);
+ }
+ }
+
+ /**
+ * record an add entry operation in journal
+ */
+ public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
+ long ledgerId = entry.getLong();
+ long entryId = entry.getLong();
+ entry.rewind();
+ queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+ }
+
+ /**
+ * Get the length of journal entries queue.
+ *
+ * @return length of journal entry queue.
+ */
+ public int getJournalQueueLength() {
+ return queue.size();
+ }
+
+ /**
+ * A thread used for persisting journal entries to journal files.
+ *
+ * <p>
+ * Besides persisting journal entries, it also takes responsibility of
+ * rolling journal files when a journal file reaches journal file size
+ * limitation.
+ * </p>
+ * <p>
+ * During journal rolling, it first closes the writing journal, generates
+ * new journal file using current timestamp, and continue persistence logic.
+ * Those journals will be garbage collected in SyncThread.
+ * </p>
+ * @see Bookie#SyncThread
+ */
+ @Override
+ public void run() {
+ LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ try {
+ long logId = 0;
+ JournalChannel logFile = null;
+ BufferedChannel bc = null;
+ long nextPrealloc = 0;
+ long lastFlushPosition = 0;
+
+ QueueEntry qe = null;
+ while (true) {
+ // new journal file to write
+ if (null == logFile) {
+ logId = System.currentTimeMillis();
+ logFile = new JournalChannel(journalDirectory, logId);
+ bc = logFile.getBufferedChannel();
+
+ lastFlushPosition = 0;
+ }
+
+ if (qe == null) {
+ if (toFlush.isEmpty()) {
+ qe = queue.take();
+ } else {
+ qe = queue.poll();
+ if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
+ //logFile.force(false);
+ bc.flush(true);
+ lastFlushPosition = bc.position();
+ lastLogMark.setLastLogMark(logId, lastFlushPosition);
+ for (QueueEntry e : toFlush) {
+ e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
+ }
+ toFlush.clear();
+
+ // check whether journal file is over file limit
+ if (bc.position() > maxJournalSize) {
+ logFile.close();
+ logFile = null;
+ continue;
+ }
+ }
+ }
+ }
+
+ if (!running) {
+ LOG.info("Journal Manager is asked to shut down, quit.");
+ break;
+ }
+
+ if (qe == null) { // no more queue entry
+ continue;
+ }
+ lenBuff.clear();
+ lenBuff.putInt(qe.entry.remaining());
+ lenBuff.flip();
+ //
+ // we should be doing the following, but then we run out of
+ // direct byte buffers
+ // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+ bc.write(lenBuff);
+ bc.write(qe.entry);
+
+ logFile.preAllocIfNeeded();
+
+ toFlush.add(qe);
+ qe = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Journal exits when shutting down", e);
+ }
+ }
+
+ /**
+ * Shuts down the journal.
+ */
+ public synchronized void shutdown() {
+ try {
+ if (!running) {
+ return;
+ }
+ running = false;
+ this.interrupt();
+ this.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted during shutting down journal : ", ie);
+ }
+ }
+
+ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
+ int total = 0;
+ while(bb.remaining() > 0) {
+ int rc = fc.read(bb);
+ if (rc <= 0) {
+ return total;
+ }
+ total += rc;
+ }
+ return total;
+ }
+}
Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Tue Apr 17 11:16:26 2012
@@ -224,7 +224,7 @@ public abstract class BookKeeperClusterT
public void sleepBookie(InetSocketAddress addr, final int seconds,
final CountDownLatch l)
throws InterruptedException, IOException {
- final String name = "Bookie-" + addr.getPort();
+ final String name = "BookieJournal-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java?rev=1327045&r1=1327044&r2=1327045&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java Tue Apr 17 11:16:26 2012
@@ -26,7 +26,6 @@ import java.util.List;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.JournalIdFilter;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -171,13 +170,15 @@ public class BookieJournalRollingTest ex
// verify that we only keep at most journal files
for (File journalDir : tmpDirs) {
- List<Long> logs = Bookie.listJournalIds(journalDir, new JournalIdFilter() {
- @Override
- public boolean accept(long journalId) {
- return true;
+ File[] journals = journalDir.listFiles();
+ int numJournals = 0;
+ for (File f : journals) {
+ if (!f.getName().endsWith(".txn")) {
+ continue;
}
- });
- assertTrue(logs.size() <= 2);
+ ++numJournals;
+ }
+ assertTrue(numJournals <= 2);
}
// restart bookies