You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/20 12:51:04 UTC
svn commit: r1302851 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
Author: ivank
Date: Tue Mar 20 11:51:03 2012
New Revision: 1302851
URL: http://svn.apache.org/viewvc?rev=1302851&view=rev
Log:
BOOKKEEPER-187: Create well defined interface for LedgerCache (ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
- copied, changed from r1302458, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Mar 20 11:51:03 2012
@@ -90,6 +90,8 @@ Trunk (unreleased changes)
BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim disk space (sijie via ivank)
+ BOOKKEEPER-187: Create well defined interface for LedgerCache (ivank)
+
hedwig-server/
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Mar 20 11:51:03 2012
@@ -400,8 +400,9 @@ public class Bookie extends Thread {
syncThread = new SyncThread(conf);
entryLogger = new EntryLogger(conf);
- ledgerCache = new LedgerCache(conf, ledgerManager);
+ ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
+ ledgerManager,
new EntryLogCompactionScanner());
// replay journals
readJournal();
@@ -565,7 +566,7 @@ public class Bookie extends Thread {
BKMBeanRegistry.getInstance().register(jmxBookieBean, parent);
try {
- jmxLedgerCacheBean = new LedgerCacheBean(this.ledgerCache);
+ jmxLedgerCacheBean = this.ledgerCache.getJMXBean();
BKMBeanRegistry.getInstance().register(jmxLedgerCacheBean, jmxBookieBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX for ledger cache", e);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Mar 20 11:51:03 2012
@@ -74,6 +74,8 @@ public class GarbageCollectorThread exte
// Ledger Cache Handle
final LedgerCache ledgerCache;
+ final LedgerManager ledgerManager;
+
// ZooKeeper Client
final ZooKeeper zk;
@@ -116,6 +118,7 @@ public class GarbageCollectorThread exte
ZooKeeper zookeeper,
LedgerCache ledgerCache,
EntryLogger entryLogger,
+ LedgerManager ledgerManager,
EntryLogScanner scanner)
throws IOException {
super("GarbageCollectorThread");
@@ -123,6 +126,7 @@ public class GarbageCollectorThread exte
this.zk = zookeeper;
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
+ this.ledgerManager = ledgerManager;
this.scanner = scanner;
this.gcWaitTime = conf.getGcWaitTime();
@@ -231,7 +235,7 @@ public class GarbageCollectorThread exte
* Do garbage collection ledger index files
*/
private void doGcLedgers() {
- ledgerCache.activeLedgerManager.garbageCollectLedgers(
+ ledgerManager.garbageCollectLedgers(
new LedgerManager.GarbageCollector() {
@Override
public void gc(long ledgerId) {
@@ -253,7 +257,7 @@ public class GarbageCollectorThread exte
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
- if (!ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+ if (!ledgerManager.containsActiveLedger(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Tue Mar 20 11:51:03 2012
@@ -21,556 +21,25 @@
package org.apache.bookkeeper.bookie;
-import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class maps a ledger entry number into a location (entrylogid, offset) in
* an entry log file. It does user level caching to more efficiently manage disk
* head scheduling.
*/
-public class LedgerCache {
- private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
-
- final File ledgerDirectories[];
-
- public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
- this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
- this.openFileLimit = conf.getOpenFileLimit();
- this.pageSize = conf.getPageSize();
- this.entriesPerPage = pageSize / 8;
-
- if (conf.getPageLimit() <= 0) {
- // allocate half of the memory to the page cache
- this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
- } else {
- this.pageLimit = conf.getPageLimit();
- }
- LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
- LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
- activeLedgerManager = alm;
- // Retrieve all of the active ledgers.
- getActiveLedgers();
- }
- /**
- * the list of potentially clean ledgers
- */
- LinkedList<Long> cleanLedgers = new LinkedList<Long>();
-
- /**
- * the list of potentially dirty ledgers
- */
- LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
-
- HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
-
- LinkedList<Long> openLedgers = new LinkedList<Long>();
-
- // Manage all active ledgers in LedgerManager
- // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
- final LedgerManager activeLedgerManager;
-
- final int openFileLimit;
- final int pageSize;
- final int pageLimit;
- final int entriesPerPage;
-
- /**
- * @return page size used in ledger cache
- */
- public int getPageSize() {
- return pageSize;
- }
-
- /**
- * @return entries per page used in ledger cache
- */
- public int getEntriesPerPage() {
- return entriesPerPage;
- }
-
- /**
- * @return page limitation in ledger cache
- */
- public int getPageLimit() {
- return pageLimit;
- }
-
- // The number of pages that have actually been used
- private int pageCount = 0;
- HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
-
- /**
- * @return number of page used in ledger cache
- */
- public int getNumUsedPages() {
- return pageCount;
- }
-
- private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
- HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
- if (map == null) {
- map = new HashMap<Long, LedgerEntryPage>();
- table.put(lep.getLedger(), map);
- }
- map.put(lep.getFirstEntry(), lep);
- }
-
- private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, Long ledger, Long firstEntry) {
- HashMap<Long, LedgerEntryPage> map = table.get(ledger);
- if (map != null) {
- return map.get(firstEntry);
- }
- return null;
- }
-
- synchronized private LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
- LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
- try {
- if (onlyDirty && lep.isClean()) {
- return null;
- }
- return lep;
- } finally {
- if (lep != null) {
- lep.usePage();
- }
- }
- }
-
- public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
- int offsetInPage = (int) (entry % entriesPerPage);
- // find the id of the first entry of the page that has the entry
- // we are looking for
- long pageEntry = entry-offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
- if (lep == null) {
- // find a free page
- lep = grabCleanPage(ledger, pageEntry);
- updatePage(lep);
- synchronized(this) {
- putIntoTable(pages, lep);
- }
- }
- if (lep != null) {
- lep.setOffset(offset, offsetInPage*8);
- lep.releasePage();
- return;
- }
- }
-
- public long getEntryOffset(long ledger, long entry) throws IOException {
- int offsetInPage = (int) (entry%entriesPerPage);
- // find the id of the first entry of the page that has the entry
- // we are looking for
- long pageEntry = entry-offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
- try {
- if (lep == null) {
- lep = grabCleanPage(ledger, pageEntry);
- // should update page before we put it into table
- // otherwise we would put an empty page in it
- updatePage(lep);
- synchronized(this) {
- putIntoTable(pages, lep);
- }
- }
- return lep.getOffset(offsetInPage*8);
- } finally {
- if (lep != null) {
- lep.releasePage();
- }
- }
- }
-
- static final String getLedgerName(long ledgerId) {
- int parent = (int) (ledgerId & 0xff);
- int grandParent = (int) ((ledgerId & 0xff00) >> 8);
- StringBuilder sb = new StringBuilder();
- sb.append(Integer.toHexString(grandParent));
- sb.append('/');
- sb.append(Integer.toHexString(parent));
- sb.append('/');
- sb.append(Long.toHexString(ledgerId));
- sb.append(".idx");
- return sb.toString();
- }
-
- static final private Random rand = new Random();
-
- static final private File pickDirs(File dirs[]) {
- return dirs[rand.nextInt(dirs.length)];
- }
-
- FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
- synchronized(fileInfoCache) {
- FileInfo fi = fileInfoCache.get(ledger);
- if (fi == null) {
- String ledgerName = getLedgerName(ledger);
- File lf = null;
- for(File d: ledgerDirectories) {
- lf = new File(d, ledgerName);
- if (lf.exists()) {
- break;
- }
- lf = null;
- }
- if (lf == null) {
- if (masterKey == null) {
- throw new Bookie.NoLedgerException(ledger);
- }
- File dir = pickDirs(ledgerDirectories);
- lf = new File(dir, ledgerName);
- // A new ledger index file has been created for this Bookie.
- // Add this new ledger to the set of active ledgers.
- if (LOG.isDebugEnabled()) {
- LOG.debug("New ledger index file created for ledgerId: " + ledger);
- }
- activeLedgerManager.addActiveLedger(ledger, true);
- }
- if (openLedgers.size() > openFileLimit) {
- fileInfoCache.remove(openLedgers.removeFirst()).close();
- }
- fi = new FileInfo(lf, masterKey);
- fileInfoCache.put(ledger, fi);
- openLedgers.add(ledger);
- }
- if (fi != null) {
- fi.use();
- }
- return fi;
- }
- }
- private void updatePage(LedgerEntryPage lep) throws IOException {
- if (!lep.isClean()) {
- throw new IOException("Trying to update a dirty page");
- }
- FileInfo fi = null;
- try {
- fi = getFileInfo(lep.getLedger(), null);
- long pos = lep.getFirstEntry()*8;
- if (pos >= fi.size()) {
- lep.zeroPage();
- } else {
- lep.readPage(fi);
- }
- } finally {
- if (fi != null) {
- fi.release();
- }
- }
- }
-
- void flushLedger(boolean doAll) throws IOException {
- synchronized(dirtyLedgers) {
- if (dirtyLedgers.isEmpty()) {
- synchronized(this) {
- for(Long l: pages.keySet()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Adding " + Long.toHexString(l) + " to dirty pages");
- }
- dirtyLedgers.add(l);
- }
- }
- }
- if (dirtyLedgers.isEmpty()) {
- return;
- }
- while(!dirtyLedgers.isEmpty()) {
- Long l = dirtyLedgers.removeFirst();
-
- flushLedger(l);
-
- if (!doAll) {
- break;
- }
- // Yeild. if we are doing all the ledgers we don't want to block other flushes that
- // need to happen
- try {
- dirtyLedgers.wait(1);
- } catch (InterruptedException e) {
- // just pass it on
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- /**
- * Flush a specified ledger
- *
- * @param l
- * Ledger Id
- * @throws IOException
- */
- private void flushLedger(long l) throws IOException {
- LinkedList<Long> firstEntryList;
- synchronized(this) {
- HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
- if (pageMap == null || pageMap.isEmpty()) {
- return;
- }
- firstEntryList = new LinkedList<Long>();
- for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
- LedgerEntryPage lep = entry.getValue();
- if (lep.isClean()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Page is clean " + lep);
- }
- continue;
- }
- firstEntryList.add(lep.getFirstEntry());
- }
- }
-
- if (firstEntryList.size() == 0) {
- LOG.debug("Nothing to flush for ledger {}.", l);
- // nothing to do
- return;
- }
-
- // Now flush all the pages of a ledger
- List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
- FileInfo fi = null;
- try {
- for(Long firstEntry: firstEntryList) {
- LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
- if (lep != null) {
- entries.add(lep);
- }
- }
- Collections.sort(entries, new Comparator<LedgerEntryPage>() {
- @Override
- public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
- return (int)(o1.getFirstEntry()-o2.getFirstEntry());
- }
- });
- ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
- fi = getFileInfo(l, null);
- int start = 0;
- long lastOffset = -1;
- for(int i = 0; i < entries.size(); i++) {
- versions.add(i, entries.get(i).getVersion());
- if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
- // send up a sequential list
- int count = i - start;
- if (count == 0) {
- System.out.println("Count cannot possibly be zero!");
- }
- writeBuffers(l, entries, fi, start, count);
- start = i;
- }
- lastOffset = entries.get(i).getFirstEntry();
- }
- if (entries.size()-start == 0 && entries.size() != 0) {
- System.out.println("Nothing to write, but there were entries!");
- }
- writeBuffers(l, entries, fi, start, entries.size()-start);
- synchronized(this) {
- for(int i = 0; i < entries.size(); i++) {
- LedgerEntryPage lep = entries.get(i);
- lep.setClean(versions.get(i));
- }
- }
- } finally {
- for(LedgerEntryPage lep: entries) {
- lep.releasePage();
- }
- if (fi != null) {
- fi.release();
- }
- }
- }
-
- private void writeBuffers(Long ledger,
- List<LedgerEntryPage> entries, FileInfo fi,
- int start, int count) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writing " + count + " buffers of " + Long.toHexString(ledger));
- }
- if (count == 0) {
- //System.out.println("Count is zero!");
- return;
- }
- ByteBuffer buffs[] = new ByteBuffer[count];
- for(int j = 0; j < count; j++) {
- buffs[j] = entries.get(start+j).getPageToWrite();
- if (entries.get(start+j).getLedger() != ledger) {
- throw new IOException("Writing to " + ledger + " but page belongs to " + entries.get(start+j).getLedger());
- }
- }
- long totalWritten = 0;
- while(buffs[buffs.length-1].remaining() > 0) {
- long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
- if (rc <= 0) {
- throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
- }
- //System.out.println("Wrote " + rc + " to " + ledger);
- totalWritten += rc;
- }
- if (totalWritten != count * pageSize) {
- throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count * pageSize);
- }
- }
- private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
- if (entry % entriesPerPage != 0) {
- throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
- }
- synchronized(this) {
- if (pageCount < pageLimit) {
- // let's see if we can allocate something
- LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
- lep.setLedger(ledger);
- lep.setFirstEntry(entry);
-
- // note, this will not block since it is a new page
- lep.usePage();
- pageCount++;
- return lep;
- }
- }
-
- outerLoop:
- while(true) {
- synchronized(cleanLedgers) {
- if (cleanLedgers.isEmpty()) {
- flushLedger(false);
- synchronized(this) {
- for(Long l: pages.keySet()) {
- cleanLedgers.add(l);
- }
- }
- }
- synchronized(this) {
- Long cleanLedger = cleanLedgers.getFirst();
- Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
- if (map == null || map.isEmpty()) {
- cleanLedgers.removeFirst();
- continue;
- }
- Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
- LedgerEntryPage lep = it.next().getValue();
- while((lep.inUse() || !lep.isClean())) {
- if (!it.hasNext()) {
- continue outerLoop;
- }
- lep = it.next().getValue();
- }
- it.remove();
- if (map.isEmpty()) {
- pages.remove(lep.getLedger());
- }
- lep.usePage();
- lep.zeroPage();
- lep.setLedger(ledger);
- lep.setFirstEntry(entry);
- return lep;
- }
- }
- }
- }
-
- public long getLastEntry(long ledgerId) {
- long lastEntry = 0;
- // Find the last entry in the cache
- synchronized(this) {
- Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
- if (map != null) {
- for(LedgerEntryPage lep: map.values()) {
- if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
- continue;
- }
- lep.usePage();
- long highest = lep.getLastEntry();
- if (highest > lastEntry) {
- lastEntry = highest;
- }
- lep.releasePage();
- }
- }
- }
-
- return lastEntry;
- }
-
- /**
- * This method will look within the ledger directories for the ledger index
- * files. That will comprise the set of active ledgers this particular
- * BookieServer knows about that have not yet been deleted by the BookKeeper
- * Client. This is called only once during initialization.
- */
- private void getActiveLedgers() {
- // Ledger index files are stored in a file hierarchy with a parent and
- // grandParent directory. We'll have to go two levels deep into these
- // directories to find the index files.
- for (File ledgerDirectory : ledgerDirectories) {
- for (File grandParent : ledgerDirectory.listFiles()) {
- if (grandParent.isDirectory()) {
- for (File parent : grandParent.listFiles()) {
- if (parent.isDirectory()) {
- for (File index : parent.listFiles()) {
- if (!index.isFile() || !index.getName().endsWith(".idx")) {
- continue;
- }
- // We've found a ledger index file. The file name is the
- // HexString representation of the ledgerId.
- String ledgerIdInHex = index.getName().substring(0, index.getName().length() - 4);
- activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
- }
- }
- }
- }
- }
- }
- }
+interface LedgerCache {
+ void setMasterKey(long ledgerId, byte[] masterKey) throws IOException;
+ byte[] readMasterKey(long ledgerId) throws IOException, BookieException;
+ boolean ledgerExists(long ledgerId) throws IOException;
- /**
- * This method is called whenever a ledger is deleted by the BookKeeper Client
- * and we want to remove all relevant data for it stored in the LedgerCache.
- */
- void deleteLedger(long ledgerId) throws IOException {
- if (LOG.isDebugEnabled())
- LOG.debug("Deleting ledgerId: " + ledgerId);
- // Delete the ledger's index file and close the FileInfo
- FileInfo fi = getFileInfo(ledgerId, null);
- fi.delete();
- fi.close();
+ void putEntryOffset(long ledger, long entry, long offset) throws IOException;
+ long getEntryOffset(long ledger, long entry) throws IOException;
- // Remove it from the active ledger manager
- activeLedgerManager.removeActiveLedger(ledgerId);
+ void flushLedger(boolean doAll) throws IOException;
+ long getLastEntry(long ledgerId) throws IOException;
- // Now remove it from all the other lists and maps.
- // These data structures need to be synchronized first before removing entries.
- synchronized(this) {
- pages.remove(ledgerId);
- }
- synchronized(fileInfoCache) {
- fileInfoCache.remove(ledgerId);
- }
- synchronized(cleanLedgers) {
- cleanLedgers.remove(ledgerId);
- }
- synchronized(dirtyLedgers) {
- dirtyLedgers.remove(ledgerId);
- }
- synchronized(openLedgers) {
- openLedgers.remove(ledgerId);
- }
- }
+ void deleteLedger(long ledgerId) throws IOException;
+ LedgerCacheBean getJMXBean();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java Tue Mar 20 11:51:03 2012
@@ -23,57 +23,5 @@ import org.apache.bookkeeper.jmx.BKMBean
/**
* Ledger Cache Bean
*/
-public class LedgerCacheBean implements LedgerCacheMXBean, BKMBeanInfo {
-
- final LedgerCache lc;
-
- public LedgerCacheBean(LedgerCache lc) {
- this.lc = lc;
- }
-
- @Override
- public String getName() {
- return "LedgerCache";
- }
-
- @Override
- public boolean isHidden() {
- return false;
- }
-
- @Override
- public int getPageCount() {
- return lc.getNumUsedPages();
- }
-
- @Override
- public int getPageSize() {
- return lc.getPageSize();
- }
-
- @Override
- public int getOpenFileLimit() {
- return lc.openFileLimit;
- }
-
- @Override
- public int getPageLimit() {
- return lc.getPageLimit();
- }
-
- @Override
- public int getNumCleanLedgers() {
- return lc.cleanLedgers.size();
- }
-
- @Override
- public int getNumDirtyLedgers() {
- return lc.dirtyLedgers.size();
- }
-
- @Override
- public int getNumOpenLedgers() {
- return lc.openLedgers.size();
- }
-
+public interface LedgerCacheBean extends LedgerCacheMXBean, BKMBeanInfo {
}
Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (from r1302458, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java&p1=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java&r1=1302458&r2=1302851&rev=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Tue Mar 20 11:51:03 2012
@@ -40,21 +40,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class maps a ledger entry number into a location (entrylogid, offset) in
- * an entry log file. It does user level caching to more efficiently manage disk
- * head scheduling.
+ * Implementation of LedgerCache interface.
+ * This class serves two purposes.
*/
-public class LedgerCache {
+public class LedgerCacheImpl implements LedgerCache {
private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
final File ledgerDirectories[];
- public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
+ public LedgerCacheImpl(ServerConfiguration conf, LedgerManager alm) {
this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
this.openFileLimit = conf.getOpenFileLimit();
this.pageSize = conf.getPageSize();
this.entriesPerPage = pageSize / 8;
-
+
if (conf.getPageLimit() <= 0) {
// allocate half of the memory to the page cache
this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
@@ -131,7 +130,8 @@ public class LedgerCache {
map.put(lep.getFirstEntry(), lep);
}
- private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, Long ledger, Long firstEntry) {
+ private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
+ Long ledger, Long firstEntry) {
HashMap<Long, LedgerEntryPage> map = table.get(ledger);
if (map != null) {
return map.get(firstEntry);
@@ -153,6 +153,7 @@ public class LedgerCache {
}
}
+ @Override
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
int offsetInPage = (int) (entry % entriesPerPage);
// find the id of the first entry of the page that has the entry
@@ -174,6 +175,7 @@ public class LedgerCache {
}
}
+ @Override
public long getEntryOffset(long ledger, long entry) throws IOException {
int offsetInPage = (int) (entry%entriesPerPage);
// find the id of the first entry of the page that has the entry
@@ -221,20 +223,13 @@ public class LedgerCache {
synchronized(fileInfoCache) {
FileInfo fi = fileInfoCache.get(ledger);
if (fi == null) {
- String ledgerName = getLedgerName(ledger);
- File lf = null;
- for(File d: ledgerDirectories) {
- lf = new File(d, ledgerName);
- if (lf.exists()) {
- break;
- }
- lf = null;
- }
+ File lf = findIndexFile(ledger);
if (lf == null) {
if (masterKey == null) {
throw new Bookie.NoLedgerException(ledger);
}
File dir = pickDirs(ledgerDirectories);
+ String ledgerName = getLedgerName(ledger);
lf = new File(dir, ledgerName);
// A new ledger index file has been created for this Bookie.
// Add this new ledger to the set of active ledgers.
@@ -276,7 +271,8 @@ public class LedgerCache {
}
}
- void flushLedger(boolean doAll) throws IOException {
+ @Override
+ public void flushLedger(boolean doAll) throws IOException {
synchronized(dirtyLedgers) {
if (dirtyLedgers.isEmpty()) {
synchronized(this) {
@@ -411,7 +407,8 @@ public class LedgerCache {
for(int j = 0; j < count; j++) {
buffs[j] = entries.get(start+j).getPageToWrite();
if (entries.get(start+j).getLedger() != ledger) {
- throw new IOException("Writing to " + ledger + " but page belongs to " + entries.get(start+j).getLedger());
+ throw new IOException("Writing to " + ledger + " but page belongs to "
+ + entries.get(start+j).getLedger());
}
}
long totalWritten = 0;
@@ -424,7 +421,8 @@ public class LedgerCache {
totalWritten += rc;
}
if (totalWritten != count * pageSize) {
- throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count * pageSize);
+ throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+ + " expected " + count * pageSize);
}
}
private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
@@ -485,7 +483,8 @@ public class LedgerCache {
}
}
- public long getLastEntry(long ledgerId) {
+ @Override
+ public long getLastEntry(long ledgerId) throws IOException {
long lastEntry = 0;
// Find the last entry in the cache
synchronized(this) {
@@ -505,6 +504,41 @@ public class LedgerCache {
}
}
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ long size = fi.size();
+ // make sure the file size is aligned with index entry size
+ // otherwise we may read incorret data
+ if (0 != size % 8) {
+ LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
+ size = size - size % 8;
+ }
+ // we may not have the last entry in the cache
+ if (size > lastEntry*8) {
+ ByteBuffer bb = ByteBuffer.allocate(getPageSize());
+ long position = size - getPageSize();
+ if (position < 0) {
+ position = 0;
+ }
+ fi.read(bb, position);
+ bb.flip();
+ long startingEntryId = position/8;
+ for(int i = getEntriesPerPage()-1; i >= 0; i--) {
+ if (bb.getLong(i*8) != 0) {
+ if (lastEntry < startingEntryId+i) {
+ lastEntry = startingEntryId+i;
+ }
+ break;
+ }
+ }
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+
return lastEntry;
}
@@ -543,7 +577,8 @@ public class LedgerCache {
* This method is called whenever a ledger is deleted by the BookKeeper Client
* and we want to remove all relevant data for it stored in the LedgerCache.
*/
- void deleteLedger(long ledgerId) throws IOException {
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
if (LOG.isDebugEnabled())
LOG.debug("Deleting ledgerId: " + ledgerId);
// Delete the ledger's index file and close the FileInfo
@@ -573,4 +608,105 @@ public class LedgerCache {
}
}
+ private File findIndexFile(long ledgerId) throws IOException {
+ String ledgerName = getLedgerName(ledgerId);
+ for(File d: ledgerDirectories) {
+ File lf = new File(d, ledgerName);
+ if (lf.exists()) {
+ return lf;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+ synchronized(fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledgerId);
+ if (fi == null) {
+ File lf = findIndexFile(ledgerId);
+ if (lf == null) {
+ throw new Bookie.NoLedgerException(ledgerId);
+ }
+ if (openLedgers.size() > openFileLimit) {
+ fileInfoCache.remove(openLedgers.removeFirst()).close();
+ }
+ fi = new FileInfo(lf, null);
+ byte[] key = fi.getMasterKey();
+ fileInfoCache.put(ledgerId, fi);
+ openLedgers.add(ledgerId);
+ return key;
+ }
+ return fi.getMasterKey();
+ }
+ }
+
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+ getFileInfo(ledgerId, masterKey);
+ }
+
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ synchronized(fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledgerId);
+ if (fi == null) {
+ File lf = findIndexFile(ledgerId);
+ if (lf == null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public LedgerCacheBean getJMXBean() {
+ return new LedgerCacheBean() {
+ @Override
+ public String getName() {
+ return "LedgerCache";
+ }
+
+ @Override
+ public boolean isHidden() {
+ return false;
+ }
+
+ @Override
+ public int getPageCount() {
+ return getNumUsedPages();
+ }
+
+ @Override
+ public int getPageSize() {
+ return getPageSize();
+ }
+
+ @Override
+ public int getOpenFileLimit() {
+ return openFileLimit;
+ }
+
+ @Override
+ public int getPageLimit() {
+ return getPageLimit();
+ }
+
+ @Override
+ public int getNumCleanLedgers() {
+ return cleanLedgers.size();
+ }
+
+ @Override
+ public int getNumDirtyLedgers() {
+ return dirtyLedgers.size();
+ }
+
+ @Override
+ public int getNumOpenLedgers() {
+ return openLedgers.size();
+ }
+ };
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Tue Mar 20 11:51:03 2012
@@ -37,11 +37,13 @@ import org.slf4j.LoggerFactory;
*/
public class LedgerDescriptor {
final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
- LedgerCache ledgerCache;
+ LedgerCacheImpl ledgerCache;
LedgerDescriptor(long ledgerId, EntryLogger entryLogger, LedgerCache ledgerCache) {
this.ledgerId = ledgerId;
this.entryLogger = entryLogger;
- this.ledgerCache = ledgerCache;
+ // This cast is only here until ledgerDescriptor changes go in to make it
+ // unnecessary
+ this.ledgerCache = (LedgerCacheImpl)ledgerCache;
}
private byte[] masterKey = null;
@@ -136,42 +138,7 @@ public class LedgerDescriptor {
* If entryId is -1, then return the last written.
*/
if (entryId == -1) {
- long lastEntry = ledgerCache.getLastEntry(ledgerId);
- FileInfo fi = null;
- try {
- fi = ledgerCache.getFileInfo(ledgerId, null);
- long size = fi.size();
- // make sure the file size is aligned with index entry size
- // otherwise we may read incorret data
- if (0 != size % 8) {
- LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
- size = size - size % 8;
- }
- // we may not have the last entry in the cache
- if (size > lastEntry*8) {
- ByteBuffer bb = ByteBuffer.allocate(ledgerCache.getPageSize());
- long position = size - ledgerCache.getPageSize();
- if (position < 0) {
- position = 0;
- }
- fi.read(bb, position);
- bb.flip();
- long startingEntryId = position/8;
- for(int i = ledgerCache.getEntriesPerPage()-1; i >= 0; i--) {
- if (bb.getLong(i*8) != 0) {
- if (lastEntry < startingEntryId+i) {
- lastEntry = startingEntryId+i;
- }
- break;
- }
- }
- }
- } finally {
- if (fi != null) {
- fi.release();
- }
- }
- entryId = lastEntry;
+ entryId = ledgerCache.getLastEntry(ledgerId);
}
offset = ledgerCache.getEntryOffset(ledgerId, entryId);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Tue Mar 20 11:51:03 2012
@@ -59,7 +59,7 @@ public class BookieJournalTest {
private void writeIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey)
throws Exception {
- File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+ File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file
@@ -70,7 +70,7 @@ public class BookieJournalTest {
private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey, boolean truncateToMasterKey)
throws Exception {
- File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+ File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Tue Mar 20 11:51:03 2012
@@ -72,7 +72,7 @@ public class UpgradeTest {
throws Exception {
long ledgerId = 1;
- File fn = new File(dir, LedgerCache.getLedgerName(ledgerId));
+ File fn = new File(dir, LedgerCacheImpl.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file