You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/11 08:36:27 UTC

svn commit: r1531203 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/

Author: sijie
Date: Fri Oct 11 06:36:27 2013
New Revision: 1531203

URL: http://svn.apache.org/r1531203
Log:
BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 11 06:36:27 2013
@@ -162,6 +162,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-645: Bookkeeper shell command to get a list of readonly bookies (rakesh via sijie)
 
+      BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)
+
     NEW FEATURE:
 
       BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Fri Oct 11 06:36:27 2013
@@ -745,7 +745,7 @@ public class BookieShell implements Tool
      * @return file object.
      */
     private File getLedgerFile(long ledgerId) {
-        String ledgerName = LedgerCacheImpl.getLedgerName(ledgerId);
+        String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId);
         File lf = null;
         for (File d : ledgerDirectories) {
             lf = new File(d, ledgerName);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Fri Oct 11 06:36:27 2013
@@ -21,19 +21,20 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.common.annotations.VisibleForTesting;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is the file handle for a ledger's index file that maps entry ids to location.
  * It is used by LedgerCache.
@@ -291,7 +292,7 @@ class FileInfo {
             if (size > fc.size()) {
                 size = fc.size();
             }
-            File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+            File rlocFile = new File(newFile.getParentFile(), newFile.getName() + IndexPersistenceMgr.RLOC);
             if (!rlocFile.exists()) {
                 checkParents(rlocFile);
                 if (!rlocFile.createNewFile()) {

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java?rev=1531203&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java Fri Oct 11 06:36:27 2013
@@ -0,0 +1,404 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IndexInMemPageMgr {
+    private final static Logger LOG = LoggerFactory.getLogger(IndexInMemPageMgr.class);
+
+    final int pageSize;
+    final int pageLimit;
+    final int entriesPerPage;
+    final HashMap<Long, HashMap<Long, LedgerEntryPage>> pages;
+
+    // The number of pages that have actually been used
+    private int pageCount = 0;
+
+    // The persistence manager that this page manager uses to
+    // flush and read pages
+    private final IndexPersistenceMgr indexPersistenceManager;
+
+    /**
+     * the list of potentially dirty ledgers
+     */
+    LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
+    /**
+     * the list of potentially clean ledgers
+     */
+    LinkedList<Long> cleanLedgers = new LinkedList<Long>();
+
+    public IndexInMemPageMgr(int pageSize,
+                             int entriesPerPage,
+                             ServerConfiguration conf, 
+                             IndexPersistenceMgr indexPersistenceManager) {
+        this.pageSize = pageSize;
+        this.entriesPerPage = entriesPerPage;
+        this.indexPersistenceManager = indexPersistenceManager;
+        this.pages = new HashMap<Long, HashMap<Long, LedgerEntryPage>>();
+
+        if (conf.getPageLimit() <= 0) {
+            // allocate half of the memory to the page cache
+            this.pageLimit = (int) ((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
+        } else {
+            this.pageLimit = conf.getPageLimit();
+        }
+        LOG.info("maxMemory = {}, pageSize = {}, pageLimit = {}", new Object[] { Runtime.getRuntime().maxMemory(),
+                        pageSize, pageLimit });
+    }
+
+    /**
+     * @return page size used in ledger cache
+     */
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return entries per page used in ledger cache
+     */
+    public int getEntriesPerPage() {
+        return entriesPerPage;
+    }
+
+    /**
+     * @return page limitation in ledger cache
+     */
+    public int getPageLimit() {
+        return pageLimit;
+    }
+
+    /**
+     * @return number of page used in ledger cache
+     */
+    public int getNumUsedPages() {
+        return pageCount;
+    }
+
+    public int getNumCleanLedgers() {
+        return cleanLedgers.size();
+    }
+
+    public int getNumDirtyLedgers() {
+        return dirtyLedgers.size();
+    }
+
+    private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
+        HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
+        if (map == null) {
+            map = new HashMap<Long, LedgerEntryPage>();
+            table.put(lep.getLedger(), map);
+        }
+        map.put(lep.getFirstEntry(), lep);
+    }
+
+    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
+                                                Long ledger, Long firstEntry) {
+        HashMap<Long, LedgerEntryPage> map = table.get(ledger);
+        if (map != null) {
+            return map.get(firstEntry);
+        }
+        return null;
+    }
+
+    synchronized protected LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+        LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
+        if (lep == null) {
+            return null;
+        }
+
+        lep.usePage();
+
+        if (onlyDirty && lep.isClean()) {
+            return null;
+        } else {
+            return lep;
+        }
+    }
+
+    /** 
+     * Grab ledger entry page whose first entry is <code>pageEntry</code>.
+     *
+     * If the page doesn't existed before, we allocate a memory page.
+     * Otherwise, we grab a clean page and read it from disk.
+     *
+     * @param ledger
+     *          Ledger Id
+     * @param pageEntry
+     *          Start entry of this entry page.
+     */
+    private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws IOException {
+        LedgerEntryPage lep = grabCleanPage(ledger, pageEntry);
+        try {
+            // should update page before we put it into table
+            // otherwise we would put an empty page in it
+            indexPersistenceManager.updatePage(lep);
+            synchronized (this) {
+                putIntoTable(pages, lep);
+            }
+        } catch (IOException ie) {
+            // if we grab a clean page, but failed to update the page
+            // we are exhausting the count of ledger entry pages.
+            // since this page will be never used, so we need to decrement
+            // page count of ledger cache.
+            lep.releasePage();
+            synchronized (this) {
+                --pageCount;
+            }
+            throw ie;
+        }
+        return lep;
+    }
+
+    void removePagesForLedger(long ledgerId) {
+        // remove pages first to avoid page flushed when deleting file info
+        synchronized (this) {
+            Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
+            if (null != lpages) {
+                pageCount -= lpages.size();
+                if (pageCount < 0) {
+                    LOG.error("Page count of ledger cache has been decremented to be less than zero.");
+                }
+            }
+        }
+    }
+
+    long getLastEntryInMem(long ledgerId) {
+        long lastEntry = 0;
+        // Find the last entry in the cache
+        synchronized (this) {
+            Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
+            if (map != null) {
+                for (LedgerEntryPage lep : map.values()) {
+                    if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
+                        continue;
+                    }
+                    lep.usePage();
+                    long highest = lep.getLastEntry();
+                    if (highest > lastEntry) {
+                        lastEntry = highest;
+                    }
+                    lep.releasePage();
+                }
+            }
+        }
+        return lastEntry;
+    }
+
+    private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
+        if (entry % entriesPerPage != 0) {
+            throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
+        }
+        outerLoop: while (true) {
+            synchronized (this) {
+                if (pageCount < pageLimit) {
+                    // let's see if we can allocate something
+                    LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+                    lep.setLedger(ledger);
+                    lep.setFirstEntry(entry);
+
+                    // note, this will not block since it is a new page
+                    lep.usePage();
+                    pageCount++;
+                    return lep;
+                }
+            }
+
+            synchronized (cleanLedgers) {
+                if (cleanLedgers.isEmpty()) {
+                    flushOneOrMoreLedgers(false);
+                    synchronized (this) {
+                        for (Long l : pages.keySet()) {
+                            cleanLedgers.add(l);
+                        }
+                    }
+                }
+                synchronized (this) {
+                    // if ledgers deleted between checking pageCount and putting
+                    // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
+                    // so give it a chance to go back to check pageCount again because
+                    // deleteLedger would decrement pageCount to return the number of pages
+                    // occupied by deleted ledgers.
+                    if (cleanLedgers.isEmpty()) {
+                        continue outerLoop;
+                    }
+                    Long cleanLedger = cleanLedgers.getFirst();
+                    Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
+                    while (map == null || map.isEmpty()) {
+                        cleanLedgers.removeFirst();
+                        if (cleanLedgers.isEmpty()) {
+                            continue outerLoop;
+                        }
+                        cleanLedger = cleanLedgers.getFirst();
+                        map = pages.get(cleanLedger);
+                    }
+                    Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
+                    LedgerEntryPage lep = it.next().getValue();
+                    while ((lep.inUse() || !lep.isClean())) {
+                        if (!it.hasNext()) {
+                            // no clean page found in this ledger
+                            cleanLedgers.removeFirst();
+                            continue outerLoop;
+                        }
+                        lep = it.next().getValue();
+                    }
+                    it.remove();
+                    if (map.isEmpty()) {
+                        pages.remove(lep.getLedger());
+                    }
+                    lep.usePage();
+                    lep.zeroPage();
+                    lep.setLedger(ledger);
+                    lep.setFirstEntry(entry);
+                    return lep;
+                }
+            }
+        }
+    }
+
+    void flushOneOrMoreLedgers(boolean doAll) throws IOException {
+        synchronized (dirtyLedgers) {
+            if (dirtyLedgers.isEmpty()) {
+                synchronized (this) {
+                    for (Long l : pages.keySet()) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Adding {} to dirty pages", Long.toHexString(l));
+                        }
+                        dirtyLedgers.add(l);
+                    }
+                }
+            }
+            if (dirtyLedgers.isEmpty()) {
+                return;
+            }
+
+            indexPersistenceManager.relocateIndexFileIfDirFull(dirtyLedgers);
+
+            while (!dirtyLedgers.isEmpty()) {
+                Long l = dirtyLedgers.removeFirst();
+
+                flushSpecificLedger(l);
+
+                if (!doAll) {
+                    break;
+                }
+                // Yield. if we are doing all the ledgers we don't want to block other flushes that
+                // need to happen
+                try {
+                    dirtyLedgers.wait(1);
+                } catch (InterruptedException e) {
+                    // just pass it on
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+    /**
+     * Flush a specified ledger
+     *
+     * @param l 
+     *          Ledger Id
+     * @throws IOException
+     */
+    private void flushSpecificLedger(long l) throws IOException {
+        LinkedList<Long> firstEntryList;
+        synchronized(this) {
+            HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
+            if (pageMap == null || pageMap.isEmpty()) {
+                indexPersistenceManager.flushLedgerHeader(l);
+                return;
+            }
+            firstEntryList = new LinkedList<Long>();
+            for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+                LedgerEntryPage lep = entry.getValue();
+                if (lep.isClean()) {
+                    LOG.trace("Page is clean {}", lep);
+                    continue;
+                }
+                firstEntryList.add(lep.getFirstEntry());
+            }
+        }
+
+        if (firstEntryList.size() == 0) {
+            LOG.debug("Nothing to flush for ledger {}.", l);
+            // nothing to do
+            return;
+        }
+
+        // Now flush all the pages of a ledger
+        List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
+        try {
+            for(Long firstEntry: firstEntryList) {
+                LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+                if (lep != null) {
+                    entries.add(lep);
+                }
+            }
+            indexPersistenceManager.flushLedgerEntries(l, entries);
+        } finally {
+            for(LedgerEntryPage lep: entries) {
+                lep.releasePage();
+            }
+        }
+    }
+
+    void putEntryOffset(long ledger, long entry, long offset) throws IOException {
+        int offsetInPage = (int) (entry % entriesPerPage);
+        // find the id of the first entry of the page that has the entry
+        // we are looking for
+        long pageEntry = entry - offsetInPage;
+        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+        if (lep == null) {
+            lep = grabLedgerEntryPage(ledger, pageEntry);
+        }
+        lep.setOffset(offset, offsetInPage * 8);
+        lep.releasePage();
+    }
+
+    long getEntryOffset(long ledger, long entry) throws IOException {
+        int offsetInPage = (int) (entry % entriesPerPage);
+        // find the id of the first entry of the page that has the entry
+        // we are looking for
+        long pageEntry = entry - offsetInPage;
+        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+        try {
+            if (lep == null) {
+                lep = grabLedgerEntryPage(ledger, pageEntry);
+            }
+            return lep.getOffset(offsetInPage * 8);
+        } finally {
+            if (lep != null) {
+                lep.releasePage();
+            }
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1531203&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Fri Oct 11 06:36:27 2013
@@ -0,0 +1,547 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class IndexPersistenceMgr {
+    private final static Logger LOG = LoggerFactory.getLogger(IndexPersistenceMgr.class);
+
+    private static final String IDX = ".idx";
+    static final String RLOC = ".rloc";
+
+    @VisibleForTesting
+    public static final String getLedgerName(long ledgerId) {
+        int parent = (int) (ledgerId & 0xff);
+        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
+        StringBuilder sb = new StringBuilder();
+        sb.append(Integer.toHexString(grandParent));
+        sb.append('/');
+        sb.append(Integer.toHexString(parent));
+        sb.append('/');
+        sb.append(Long.toHexString(ledgerId));
+        sb.append(IDX);
+        return sb.toString();
+    }
+
+    final HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
+    final int openFileLimit;
+    final int pageSize;
+    final int entriesPerPage;
+
+    // Manage all active ledgers in LedgerManager
+    // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
+    final SnapshotMap<Long, Boolean> activeLedgers;
+    private LedgerDirsManager ledgerDirsManager;
+    final LinkedList<Long> openLedgers = new LinkedList<Long>();
+    final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
+
+    public IndexPersistenceMgr(int pageSize,
+                               int entriesPerPage,
+                               ServerConfiguration conf,
+                               SnapshotMap<Long, Boolean> activeLedgers,
+                               LedgerDirsManager ledgerDirsManager) throws IOException {
+        this.openFileLimit = conf.getOpenFileLimit();
+        this.activeLedgers = activeLedgers;
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.pageSize = pageSize;
+        this.entriesPerPage = entriesPerPage;
+        LOG.info("openFileLimit = {}", openFileLimit);
+        // Retrieve all of the active ledgers.
+        getActiveLedgers();
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+    }
+
+    FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
+        synchronized (fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledger);
+            if (fi == null) {
+                File lf = findIndexFile(ledger);
+                if (lf == null) {
+                    if (masterKey == null) {
+                        throw new Bookie.NoLedgerException(ledger);
+                    }
+                    lf = getNewLedgerIndexFile(ledger, null);
+                    // A new ledger index file has been created for this Bookie.
+                    // Add this new ledger to the set of active ledgers.
+                    LOG.debug("New ledger index file created for ledgerId: {}", ledger);
+                    activeLedgers.put(ledger, true);
+                }
+                evictFileInfoIfNecessary();
+                fi = new FileInfo(lf, masterKey);
+                fileInfoCache.put(ledger, fi);
+                openLedgers.add(ledger);
+            }
+            if (fi != null) {
+                fi.use();
+            }
+            return fi;
+        }
+    }
+
+    /**
+     * Get a new index file for ledger excluding directory <code>excludedDir</code>.
+     *
+     * @param ledger
+     *          Ledger id.
+     * @param excludedDir
+     *          The ledger directory to exclude.
+     * @return new index file object.
+     * @throws NoWritableLedgerDirException if there is no writable dir available.
+     */
+    private File getNewLedgerIndexFile(Long ledger, File excludedDir)
+                    throws NoWritableLedgerDirException {
+        File dir = ledgerDirsManager.pickRandomWritableDir(excludedDir);
+        String ledgerName = getLedgerName(ledger);
+        return new File(dir, ledgerName);
+    }
+
+    /**
+     * This method will look within the ledger directories for the ledger index
+     * files. That will comprise the set of active ledgers this particular
+     * BookieServer knows about that have not yet been deleted by the BookKeeper
+     * Client. This is called only once during initialization.
+     */
+    private void getActiveLedgers() throws IOException {
+        // Ledger index files are stored in a file hierarchy with a parent and
+        // grandParent directory. We'll have to go two levels deep into these
+        // directories to find the index files.
+        for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
+            for (File grandParent : ledgerDirectory.listFiles()) {
+                if (grandParent.isDirectory()) {
+                    for (File parent : grandParent.listFiles()) {
+                        if (parent.isDirectory()) {
+                            for (File index : parent.listFiles()) {
+                                if (!index.isFile()
+                                        || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
+                                    continue;
+                                }
+
+                                // We've found a ledger index file. The file
+                                // name is the HexString representation of the
+                                // ledgerId.
+                                String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
+                                if (index.getName().endsWith(RLOC)) {
+                                    if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
+                                        if (!index.delete()) {
+                                            LOG.warn("Deleting the rloc file " + index + " failed");
+                                        }
+                                        continue;
+                                    } else {
+                                        File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
+                                        if (!index.renameTo(dest)) {
+                                            throw new IOException("Renaming rloc file " + index
+                                                    + " to index file has failed");
+                                        }
+                                    }
+                                }
+                                activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * This method is called whenever a ledger is deleted by the BookKeeper Client
+     * and we want to remove all relevant data for it stored in the LedgerCache.
+     */
+    void removeLedger(long ledgerId) throws IOException {
+        // Delete the ledger's index file and close the FileInfo
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            fi.close(false);
+            fi.delete();
+        } finally {
+            // should release use count
+            // otherwise the file channel would not be closed.
+            if (null != fi) {
+                fi.release();
+            }
+        }
+
+        // Remove it from the active ledger manager
+        activeLedgers.remove(ledgerId);
+
+        // Now remove it from all the other lists and maps.
+        // These data structures need to be synchronized first before removing entries.
+        synchronized (fileInfoCache) {
+            fileInfoCache.remove(ledgerId);
+        }
+        synchronized (openLedgers) {
+            openLedgers.remove(ledgerId);
+        }
+    }
+
+    private File findIndexFile(long ledgerId) throws IOException {
+        String ledgerName = getLedgerName(ledgerId);
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
+            File lf = new File(d, ledgerName);
+            if (lf.exists()) {
+                return lf;
+            }
+        }
+        return null;
+    }
+
+    boolean ledgerExists(long ledgerId) throws IOException {
+        synchronized (fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledgerId);
+            if (fi == null) {
+                File lf = findIndexFile(ledgerId);
+                if (lf == null) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    int getNumOpenLedgers() {
+        return openLedgers.size();
+    }
+
+    // evict file info if necessary
+    private void evictFileInfoIfNecessary() throws IOException {
+        synchronized (fileInfoCache) {
+            if (openLedgers.size() > openFileLimit) {
+                long ledgerToRemove = openLedgers.removeFirst();
+                // TODO Add a statistic here, we don't care really which
+                // ledger is evicted, but the rate at which they get evicted 
+                fileInfoCache.remove(ledgerToRemove).close(true);
+            }
+        }
+    }
+
+    void close() throws IOException {
+        synchronized (fileInfoCache) {
+            for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
+                FileInfo value = fileInfo.getValue();
+                if (value != null) {
+                    value.close(true);
+                }
+            }
+            fileInfoCache.clear();
+        }
+    }
+
+    byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        synchronized (fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledgerId);
+            if (fi == null) {
+                File lf = findIndexFile(ledgerId);
+                if (lf == null) {
+                    throw new Bookie.NoLedgerException(ledgerId);
+                }
+                evictFileInfoIfNecessary();
+                fi = new FileInfo(lf, null);
+                byte[] key = fi.getMasterKey();
+                fileInfoCache.put(ledgerId, fi);
+                openLedgers.add(ledgerId);
+                return key;
+            }
+            return fi.getMasterKey();
+        }
+    }
+
+    void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, masterKey);
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    boolean setFenced(long ledgerId) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.setFenced();
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    boolean isFenced(long ledgerId) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.isFenced();
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
+
+    int getOpenFileLimit() {
+        return openFileLimit;
+    }
+
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                // If the current entry log disk is full, then create new entry
+                // log.
+                shouldRelocateIndexFile.set(true);
+            }
+
+            @Override
+            public void diskFailed(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void allDisksFull() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void fatalError() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+        };
+    }
+
+    void relocateIndexFileIfDirFull(Collection<Long> dirtyLedgers) throws IOException {
+        if (shouldRelocateIndexFile.get()) {
+            // if some new dir detected as full, then move all corresponding
+            // open index files to new location
+            for (Long l : dirtyLedgers) {
+                FileInfo fi = null;
+                try {
+                    fi = getFileInfo(l, null);
+                    File currentDir = getLedgerDirForLedger(fi);
+                    if (ledgerDirsManager.isDirFull(currentDir)) {
+                        moveLedgerIndexFile(l, fi);
+                    }
+                } finally {
+                    if (null != fi) {
+                        fi.release();
+                    }
+                }
+            }
+            shouldRelocateIndexFile.set(false);
+        }
+    }
+
+    /**
+     * Get the ledger directory that the ledger index belongs to.
+     *
+     * @param fi File info of a ledger
+     * @return ledger directory that the ledger belongs to.
+     */
+    private File getLedgerDirForLedger(FileInfo fi) {
+        return fi.getLf().getParentFile().getParentFile().getParentFile();
+    }
+
+    private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
+        File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
+        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+    }
+
+    void flushLedgerHeader(long ledger) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledger, null);
+            fi.flushHeader();
+        } catch (Bookie.NoLedgerException nle) {
+            // ledger has been deleted
+            return;
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+        return;
+    }
+
+    void flushLedgerEntries(long l, List<LedgerEntryPage> entries) throws IOException {
+        FileInfo fi = null;
+        try {
+            Collections.sort(entries, new Comparator<LedgerEntryPage>() {
+                @Override
+                public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
+                    return (int) (o1.getFirstEntry() - o2.getFirstEntry());
+                }
+            });
+            ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+            try {
+                fi = getFileInfo(l, null);
+            } catch (Bookie.NoLedgerException nle) {
+                // ledger has been deleted
+                return;
+            }
+
+            // flush the header if necessary
+            fi.flushHeader();
+            int start = 0;
+            long lastOffset = -1;
+            for (int i = 0; i < entries.size(); i++) {
+                versions.add(i, entries.get(i).getVersion());
+                if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
+                    // send up a sequential list
+                    int count = i - start;
+                    if (count == 0) {
+                        LOG.warn("Count cannot possibly be zero!");
+                    }
+                    writeBuffers(l, entries, fi, start, count);
+                    start = i;
+                }
+                lastOffset = entries.get(i).getFirstEntry();
+            }
+            if (entries.size() - start == 0 && entries.size() != 0) {
+                LOG.warn("Nothing to write, but there were entries!");
+            }
+            writeBuffers(l, entries, fi, start, entries.size() - start);
+            synchronized (this) {
+                for (int i = 0; i < entries.size(); i++) {
+                    LedgerEntryPage lep = entries.get(i);
+                    lep.setClean(versions.get(i));
+                }
+            }
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+    }
+
+    private void writeBuffers(Long ledger,
+                              List<LedgerEntryPage> entries, FileInfo fi,
+                              int start, int count) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
+        }
+        if (count == 0) {
+            return;
+        }
+        ByteBuffer buffs[] = new ByteBuffer[count];
+        for (int j = 0; j < count; j++) {
+            buffs[j] = entries.get(start + j).getPageToWrite();
+            if (entries.get(start + j).getLedger() != ledger) {
+                throw new IOException("Writing to " + ledger + " but page belongs to "
+                                + entries.get(start + j).getLedger());
+            }
+        }
+        long totalWritten = 0;
+        while (buffs[buffs.length - 1].remaining() > 0) {
+            long rc = fi.write(buffs, entries.get(start + 0).getFirstEntry() * 8);
+            if (rc <= 0) {
+                throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
+            }
+            totalWritten += rc;
+        }
+        if (totalWritten != (long) count * (long) pageSize) {
+            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+                            + " expected " + count * pageSize);
+        }
+    }
+
+    void updatePage(LedgerEntryPage lep) throws IOException {
+        if (!lep.isClean()) {
+            throw new IOException("Trying to update a dirty page");
+        }
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(lep.getLedger(), null);
+            long pos = lep.getFirstEntry() * 8;
+            if (pos >= fi.size()) {
+                lep.zeroPage();
+            } else {
+                lep.readPage(fi);
+            }
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+    }
+
+    long getPersistEntryBeyondInMem(long ledgerId, long lastEntryInMem) throws IOException {
+        FileInfo fi = null;
+        long lastEntry = lastEntryInMem;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            long size = fi.size();
+            // make sure the file size is aligned with index entry size
+            // otherwise we may read incorret data
+            if (0 != size % 8) {
+                LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
+                size = size - size % 8;
+            }
+            // we may not have the last entry in the cache
+            if (size > lastEntry * 8) {
+                ByteBuffer bb = ByteBuffer.allocate(pageSize);
+                long position = size - pageSize;
+                if (position < 0) {
+                    position = 0;
+                }
+                fi.read(bb, position);
+                bb.flip();
+                long startingEntryId = position / 8;
+                for (int i = entriesPerPage - 1; i >= 0; i--) {
+                    if (bb.getLong(i * 8) != 0) {
+                        if (lastEntry < startingEntryId + i) {
+                            lastEntry = startingEntryId + i;
+                        }
+                        break;
+                    }
+                }
+            }
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+        return lastEntry;
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Fri Oct 11 06:36:27 2013
@@ -21,26 +21,10 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,55 +34,28 @@ import org.slf4j.LoggerFactory;
  */
 public class LedgerCacheImpl implements LedgerCache {
     private final static Logger LOG = LoggerFactory.getLogger(LedgerCacheImpl.class);
-    private static final String IDX = ".idx";
-    static final String RLOC = ".rloc";
 
-    private LedgerDirsManager ledgerDirsManager;
-    final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
+    private final IndexInMemPageMgr indexPageManager;
+    private final IndexPersistenceMgr indexPersistenceManager;
+    private final int pageSize;
+    private final int entriesPerPage;
 
     public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
-            LedgerDirsManager ledgerDirsManager)
-            throws IOException {
-        this.ledgerDirsManager = ledgerDirsManager;
-        this.openFileLimit = conf.getOpenFileLimit();
+                    LedgerDirsManager ledgerDirsManager) throws IOException {
         this.pageSize = conf.getPageSize();
         this.entriesPerPage = pageSize / 8;
-
-        if (conf.getPageLimit() <= 0) {
-            // allocate half of the memory to the page cache
-            this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
-        } else {
-            this.pageLimit = conf.getPageLimit();
-        }
-        LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
-        LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
-        this.activeLedgers = activeLedgers;
-        // Retrieve all of the active ledgers.
-        getActiveLedgers();
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        this.indexPersistenceManager = new IndexPersistenceMgr(pageSize, entriesPerPage, conf, activeLedgers,
+                        ledgerDirsManager);
+        this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf, indexPersistenceManager);
     }
-    /**
-     * the list of potentially clean ledgers
-     */
-    LinkedList<Long> cleanLedgers = new LinkedList<Long>();
 
-    /**
-     * the list of potentially dirty ledgers
-     */
-    LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
-
-    HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
-
-    LinkedList<Long> openLedgers = new LinkedList<Long>();
-
-    // Manage all active ledgers in LedgerManager
-    // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
-    final SnapshotMap<Long, Boolean> activeLedgers;
+    IndexPersistenceMgr getIndexPersistenceManager() {
+        return indexPersistenceManager;
+    }
 
-    final int openFileLimit;
-    final int pageSize;
-    final int pageLimit;
-    final int entriesPerPage;
+    IndexInMemPageMgr getIndexPageManager() {
+        return indexPageManager;
+    }
 
     /**
      * @return page size used in ledger cache
@@ -107,613 +64,34 @@ public class LedgerCacheImpl implements 
         return pageSize;
     }
 
-    /**
-     * @return entries per page used in ledger cache
-     */
-    public int getEntriesPerPage() {
-        return entriesPerPage;
-    }
-
-    /**
-     * @return page limitation in ledger cache
-     */
-    public int getPageLimit() {
-        return pageLimit;
-    }
-
-    // The number of pages that have actually been used
-    private int pageCount = 0;
-    HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
-
-    /**
-     * @return number of page used in ledger cache
-     */
-    public int getNumUsedPages() {
-        return pageCount;
-    }
-
-    private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
-        HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
-        if (map == null) {
-            map = new HashMap<Long, LedgerEntryPage>();
-            table.put(lep.getLedger(), map);
-        }
-        map.put(lep.getFirstEntry(), lep);
-    }
-
-    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
-                                                Long ledger, Long firstEntry) {
-        HashMap<Long, LedgerEntryPage> map = table.get(ledger);
-        if (map != null) {
-            return map.get(firstEntry);
-        }
-        return null;
-    }
-
-    synchronized protected LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
-        LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
-        if (lep == null) {
-            return null;
-        }
-
-        lep.usePage();
-
-        if (onlyDirty && lep.isClean()) {
-            return null;
-        } else {
-            return lep;
-        }
-    }
-
-    /** 
-     * Grab ledger entry page whose first entry is <code>pageEntry</code>.
-     *
-     * If the page doesn't existed before, we allocate a memory page.
-     * Otherwise, we grab a clean page and read it from disk.
-     *
-     * @param ledger
-     *          Ledger Id
-     * @param pageEntry
-     *          Start entry of this entry page.
-     */
-    private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws IOException {
-        LedgerEntryPage lep = grabCleanPage(ledger, pageEntry);
-        try {
-            // should update page before we put it into table
-            // otherwise we would put an empty page in it
-            updatePage(lep);
-            synchronized(this) {
-                putIntoTable(pages, lep);
-            }   
-        } catch (IOException ie) {
-            // if we grab a clean page, but failed to update the page
-            // we are exhausting the count of ledger entry pages.
-            // since this page will be never used, so we need to decrement
-            // page count of ledger cache.
-            lep.releasePage();
-            synchronized (this) {
-                --pageCount;
-            }
-            throw ie; 
-        }   
-        return lep;
-    }
-
     @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
-        int offsetInPage = (int) (entry % entriesPerPage);
-        // find the id of the first entry of the page that has the entry
-        // we are looking for
-        long pageEntry = entry-offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
-        if (lep == null) {
-            lep = grabLedgerEntryPage(ledger, pageEntry); 
-        }
-        lep.setOffset(offset, offsetInPage*8);
-        lep.releasePage();
+        indexPageManager.putEntryOffset(ledger, entry, offset);
     }
 
     @Override
     public long getEntryOffset(long ledger, long entry) throws IOException {
-        int offsetInPage = (int) (entry%entriesPerPage);
-        // find the id of the first entry of the page that has the entry
-        // we are looking for
-        long pageEntry = entry-offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
-        try {
-            if (lep == null) {
-                lep = grabLedgerEntryPage(ledger, pageEntry);
-            }
-            return lep.getOffset(offsetInPage*8);
-        } finally {
-            if (lep != null) {
-                lep.releasePage();
-            }
-        }
-    }
-
-    @VisibleForTesting
-    public static final String getLedgerName(long ledgerId) {
-        int parent = (int) (ledgerId & 0xff);
-        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
-        StringBuilder sb = new StringBuilder();
-        sb.append(Integer.toHexString(grandParent));
-        sb.append('/');
-        sb.append(Integer.toHexString(parent));
-        sb.append('/');
-        sb.append(Long.toHexString(ledgerId));
-        sb.append(IDX);
-        return sb.toString();
-    }
-
-    FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
-        synchronized(fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledger);
-            if (fi == null) {
-                File lf = findIndexFile(ledger);
-                if (lf == null) {
-                    if (masterKey == null) {
-                        throw new Bookie.NoLedgerException(ledger);
-                    }
-                    lf = getNewLedgerIndexFile(ledger, null);
-                    // A new ledger index file has been created for this Bookie.
-                    // Add this new ledger to the set of active ledgers.
-                    LOG.debug("New ledger index file created for ledgerId: {}", ledger);
-                    activeLedgers.put(ledger, true);
-                }
-                evictFileInfoIfNecessary();
-                fi = new FileInfo(lf, masterKey);
-                fileInfoCache.put(ledger, fi);
-                openLedgers.add(ledger);
-            }
-            if (fi != null) {
-                fi.use();
-            }
-            return fi;
-        }
-    }
-
-    /**
-     * Get a new index file for ledger excluding directory <code>excludedDir</code>.
-     *
-     * @param ledger
-     *          Ledger id.
-     * @param excludedDir
-     *          The ledger directory to exclude.
-     * @return new index file object.
-     * @throws NoWritableLedgerDirException if there is no writable dir available.
-     */
-    private File getNewLedgerIndexFile(Long ledger, File excludedDir)
-    throws NoWritableLedgerDirException {
-        File dir = ledgerDirsManager.pickRandomWritableDir(excludedDir);
-        String ledgerName = getLedgerName(ledger);
-        return new File(dir, ledgerName);
-    }
-
-    private void updatePage(LedgerEntryPage lep) throws IOException {
-        if (!lep.isClean()) {
-            throw new IOException("Trying to update a dirty page");
-        }
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(lep.getLedger(), null);
-            long pos = lep.getFirstEntry()*8;
-            if (pos >= fi.size()) {
-                lep.zeroPage();
-            } else {
-                lep.readPage(fi);
-            }
-        } finally {
-            if (fi != null) {
-                fi.release();
-            }
-        }
-    }
-
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // If the current entry log disk is full, then create new entry
-                // log.
-                shouldRelocateIndexFile.set(true);
-            }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-        };
+        return indexPageManager.getEntryOffset(ledger, entry);
     }
 
     @Override
     public void flushLedger(boolean doAll) throws IOException {
-        synchronized(dirtyLedgers) {
-            if (dirtyLedgers.isEmpty()) {
-                synchronized(this) {
-                    for(Long l: pages.keySet()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Adding {} to dirty pages", Long.toHexString(l));
-                        }
-                        dirtyLedgers.add(l);
-                    }
-                }
-            }
-            if (dirtyLedgers.isEmpty()) {
-                return;
-            }
-
-            if (shouldRelocateIndexFile.get()) {
-                // if some new dir detected as full, then move all corresponding
-                // open index files to new location
-                for (Long l : dirtyLedgers) {
-                    FileInfo fi = null;
-                    try {
-                        fi = getFileInfo(l, null);
-                        File currentDir = getLedgerDirForLedger(fi);
-                        if (ledgerDirsManager.isDirFull(currentDir)) {
-                            moveLedgerIndexFile(l, fi);
-                        }
-                    } finally {
-                        if (null != fi) {
-                            fi.release();
-                        }
-                    }
-                }
-                shouldRelocateIndexFile.set(false);
-            }
-
-            while(!dirtyLedgers.isEmpty()) {
-                Long l = dirtyLedgers.removeFirst();
-
-                flushLedger(l);
-
-                if (!doAll) {
-                    break;
-                }
-                // Yield. if we are doing all the ledgers we don't want to block other flushes that
-                // need to happen
-                try {
-                    dirtyLedgers.wait(1);
-                } catch (InterruptedException e) {
-                    // just pass it on
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-    }
-
-    /**
-     * Get the ledger directory that the ledger index belongs to.
-     *
-     * @param fi File info of a ledger
-     * @return ledger directory that the ledger belongs to.
-     */
-    private File getLedgerDirForLedger(FileInfo fi) {
-        return fi.getLf().getParentFile().getParentFile().getParentFile();
-    }
-
-    private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
-        File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
-        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
-    }
-
-    /**
-     * Flush a specified ledger
-     *
-     * @param l
-     *          Ledger Id
-     * @throws IOException
-     */
-    private void flushLedger(long l) throws IOException {
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(l, null);
-            flushLedger(l, fi);
-        } catch (Bookie.NoLedgerException nle) {
-            // ledger has been deleted
-        } finally {
-            if (null != fi) {
-                fi.release();
-            }
-        }
-    }
-
-    private void flushLedger(long l, FileInfo fi) throws IOException {
-        LinkedList<Long> firstEntryList;
-        synchronized(this) {
-            HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
-            if (pageMap == null || pageMap.isEmpty()) {
-                fi.flushHeader();
-                return;
-            }
-            firstEntryList = new LinkedList<Long>();
-            for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
-                LedgerEntryPage lep = entry.getValue();
-                if (lep.isClean()) {
-                    LOG.trace("Page is clean {}", lep);
-                    continue;
-                }
-                firstEntryList.add(lep.getFirstEntry());
-            }
-        }
-
-        if (firstEntryList.size() == 0) {
-            LOG.debug("Nothing to flush for ledger {}.", l);
-            // nothing to do
-            return;
-        }
-
-        // Now flush all the pages of a ledger
-        List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
-        try {
-            for(Long firstEntry: firstEntryList) {
-                LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
-                if (lep != null) {
-                    entries.add(lep);
-                }
-            }
-            Collections.sort(entries, new Comparator<LedgerEntryPage>() {
-                    @Override
-                    public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
-                    return (int)(o1.getFirstEntry()-o2.getFirstEntry());
-                    }
-                    });
-            ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
-            // flush the header if necessary
-            fi.flushHeader();
-            int start = 0;
-            long lastOffset = -1;
-            for(int i = 0; i < entries.size(); i++) {
-                versions.add(i, entries.get(i).getVersion());
-                if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
-                    // send up a sequential list
-                    int count = i - start;
-                    if (count == 0) {
-                        LOG.warn("Count cannot possibly be zero!");
-                    }
-                    writeBuffers(l, entries, fi, start, count);
-                    start = i;
-                }
-                lastOffset = entries.get(i).getFirstEntry();
-            }
-            if (entries.size()-start == 0 && entries.size() != 0) {
-                LOG.warn("Nothing to write, but there were entries!");
-            }
-            writeBuffers(l, entries, fi, start, entries.size()-start);
-            synchronized(this) {
-                for(int i = 0; i < entries.size(); i++) {
-                    LedgerEntryPage lep = entries.get(i);
-                    lep.setClean(versions.get(i));
-                }
-            }
-        } finally {
-            for(LedgerEntryPage lep: entries) {
-                lep.releasePage();
-            }
-        }
-    }
-
-    private void writeBuffers(Long ledger,
-                              List<LedgerEntryPage> entries, FileInfo fi,
-                              int start, int count) throws IOException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
-        }
-        if (count == 0) {
-            return;
-        }
-        ByteBuffer buffs[] = new ByteBuffer[count];
-        for(int j = 0; j < count; j++) {
-            buffs[j] = entries.get(start+j).getPageToWrite();
-            if (entries.get(start+j).getLedger() != ledger) {
-                throw new IOException("Writing to " + ledger + " but page belongs to "
-                                      + entries.get(start+j).getLedger());
-            }
-        }
-        long totalWritten = 0;
-        while(buffs[buffs.length-1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
-            if (rc <= 0) {
-                throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
-            }
-            totalWritten += rc;
-        }
-        if (totalWritten != (long)count * (long)pageSize) {
-            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
-                                  + " expected " + count * pageSize);
-        }
-    }
-    private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
-        if (entry % entriesPerPage != 0) {
-            throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
-        }
-        outerLoop:
-        while(true) {
-            synchronized(this) {
-                if (pageCount  < pageLimit) {
-                    // let's see if we can allocate something
-                    LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-                    lep.setLedger(ledger);
-                    lep.setFirstEntry(entry);
-
-                    // note, this will not block since it is a new page
-                    lep.usePage();
-                    pageCount++;
-                    return lep;
-                }
-            }
-
-            synchronized(cleanLedgers) {
-                if (cleanLedgers.isEmpty()) {
-                    flushLedger(false);
-                    synchronized(this) {
-                        for(Long l: pages.keySet()) {
-                            cleanLedgers.add(l);
-                        }
-                    }
-                }
-                synchronized(this) {
-                    // if ledgers deleted between checking pageCount and putting
-                    // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
-                    // so give it a chance to go back to check pageCount again because
-                    // deleteLedger would decrement pageCount to return the number of pages
-                    // occupied by deleted ledgers.
-                    if (cleanLedgers.isEmpty()) {
-                        continue outerLoop;
-                    }
-                    Long cleanLedger = cleanLedgers.getFirst();
-                    Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
-                    while (map == null || map.isEmpty()) {
-                        cleanLedgers.removeFirst();
-                        if (cleanLedgers.isEmpty()) {
-                            continue outerLoop; 
-                        }
-                        cleanLedger = cleanLedgers.getFirst();
-                        map = pages.get(cleanLedger);
-                    }
-                    Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
-                    LedgerEntryPage lep = it.next().getValue();
-                    while((lep.inUse() || !lep.isClean())) {
-                        if (!it.hasNext()) {
-                            // no clean page found in this ledger
-                            cleanLedgers.removeFirst();
-                            continue outerLoop;
-                        }
-                        lep = it.next().getValue();
-                    }
-                    it.remove();
-                    if (map.isEmpty()) {
-                        pages.remove(lep.getLedger());
-                    }
-                    lep.usePage();
-                    lep.zeroPage();
-                    lep.setLedger(ledger);
-                    lep.setFirstEntry(entry);
-                    return lep;
-                }
-            }
-        }
+        indexPageManager.flushOneOrMoreLedgers(doAll);
     }
 
     @Override
     public long getLastEntry(long ledgerId) throws IOException {
-        long lastEntry = 0;
-        // Find the last entry in the cache
-        synchronized(this) {
-            Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
-            if (map != null) {
-                for(LedgerEntryPage lep: map.values()) {
-                    if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
-                        continue;
-                    }
-                    lep.usePage();
-                    long highest = lep.getLastEntry();
-                    if (highest > lastEntry) {
-                        lastEntry = highest;
-                    }
-                    lep.releasePage();
-                }
-            }
-        }
-
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(ledgerId, null);
-            long size = fi.size();
-            // make sure the file size is aligned with index entry size
-            // otherwise we may read incorret data
-            if (0 != size % 8) {
-                LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
-                size = size - size % 8;
-            }
-            // we may not have the last entry in the cache
-            if (size > lastEntry*8) {
-                ByteBuffer bb = ByteBuffer.allocate(getPageSize());
-                long position = size - getPageSize();
-                if (position < 0) {
-                    position = 0;
-                }
-                fi.read(bb, position);
-                bb.flip();
-                long startingEntryId = position/8;
-                for(int i = getEntriesPerPage()-1; i >= 0; i--) {
-                    if (bb.getLong(i*8) != 0) {
-                        if (lastEntry < startingEntryId+i) {
-                            lastEntry = startingEntryId+i;
-                        }
-                        break;
-                    }
-                }
-            }
-        } finally {
-            if (fi != null) {
-                fi.release();
-            }
-        }
-
+        // Get the highest entry from the pages that are in memory
+        long lastEntryInMem = indexPageManager.getLastEntryInMem(ledgerId);
+        // Some index pages may have been evicted from memory, retrieve the last entry
+        // from the persistent store. We will check if there could be an entry beyond the
+        // last in mem entry and only then attempt to get the last persisted entry from the file
+        // The latter is just an optimization
+        long lastEntry = indexPersistenceManager.getPersistEntryBeyondInMem(ledgerId, lastEntryInMem);
         return lastEntry;
     }
 
     /**
-     * This method will look within the ledger directories for the ledger index
-     * files. That will comprise the set of active ledgers this particular
-     * BookieServer knows about that have not yet been deleted by the BookKeeper
-     * Client. This is called only once during initialization.
-     */
-    private void getActiveLedgers() throws IOException {
-        // Ledger index files are stored in a file hierarchy with a parent and
-        // grandParent directory. We'll have to go two levels deep into these
-        // directories to find the index files.
-        for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
-            for (File grandParent : ledgerDirectory.listFiles()) {
-                if (grandParent.isDirectory()) {
-                    for (File parent : grandParent.listFiles()) {
-                        if (parent.isDirectory()) {
-                            for (File index : parent.listFiles()) {
-                                if (!index.isFile()
-                                        || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
-                                    continue;
-                                }
-
-                                // We've found a ledger index file. The file
-                                // name is the HexString representation of the
-                                // ledgerId.
-                                String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
-                                if (index.getName().endsWith(RLOC)) {
-                                    if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
-                                        if (!index.delete()) {
-                                            LOG.warn("Deleting the rloc file " + index + " failed");
-                                        }
-                                        continue;
-                                    } else {
-                                        File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
-                                        if (!index.renameTo(dest)) {
-                                            throw new IOException("Renaming rloc file " + index
-                                                    + " to index file has failed");
-                                        }
-                                    }
-                                }
-                                activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
      * This method is called whenever a ledger is deleted by the BookKeeper Client
      * and we want to remove all relevant data for it stored in the LedgerCache.
      */
@@ -721,144 +99,33 @@ public class LedgerCacheImpl implements 
     public void deleteLedger(long ledgerId) throws IOException {
         LOG.debug("Deleting ledgerId: {}", ledgerId);
 
-        // remove pages first to avoid page flushed when deleting file info
-        synchronized(this) {
-            Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
-            if (null != lpages) {
-                pageCount -= lpages.size();
-                if (pageCount < 0) {
-                    LOG.error("Page count of ledger cache has been decremented to be less than zero.");
-                }
-            }
-        }
-        // Delete the ledger's index file and close the FileInfo
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(ledgerId, null);
-            fi.close(false);
-            fi.delete();
-        } finally {
-            // should release use count
-            // otherwise the file channel would not be closed.
-            if (null != fi) {
-                fi.release();
-            }
-        }
-
-        // Remove it from the active ledger manager
-        activeLedgers.remove(ledgerId);
-
-        // Now remove it from all the other lists and maps.
-        // These data structures need to be synchronized first before removing entries.
-        synchronized(fileInfoCache) {
-            fileInfoCache.remove(ledgerId);
-        }
-        synchronized(cleanLedgers) {
-            cleanLedgers.remove(ledgerId);
-        }
-        synchronized(dirtyLedgers) {
-            dirtyLedgers.remove(ledgerId);
-        }
-        synchronized(openLedgers) {
-            openLedgers.remove(ledgerId);
-        }
-    }
-
-    private File findIndexFile(long ledgerId) throws IOException {
-        String ledgerName = getLedgerName(ledgerId);
-        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
-            File lf = new File(d, ledgerName);
-            if (lf.exists()) {
-                return lf;
-            }
-        }
-        return null;
+        indexPageManager.removePagesForLedger(ledgerId);
+        indexPersistenceManager.removeLedger(ledgerId);
     }
 
     @Override
     public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-        synchronized(fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledgerId);
-            if (fi == null) {
-                File lf = findIndexFile(ledgerId);
-                if (lf == null) {
-                    throw new Bookie.NoLedgerException(ledgerId);
-                }
-                evictFileInfoIfNecessary();        
-                fi = new FileInfo(lf, null);
-                byte[] key = fi.getMasterKey();
-                fileInfoCache.put(ledgerId, fi);
-                openLedgers.add(ledgerId);
-                return key;
-            }
-            return fi.getMasterKey();
-        }
-    }
-
-    // evict file info if necessary
-    private void evictFileInfoIfNecessary() throws IOException {
-        synchronized (fileInfoCache) {
-            if (openLedgers.size() > openFileLimit) {
-                long ledgerToRemove = openLedgers.removeFirst();
-                // TODO Add a statistic here, we don't care really which
-                // ledger is evicted, but the rate at which they get evicted
-                LOG.debug("Ledger {} is evicted from file info cache.",
-                          ledgerToRemove);
-                fileInfoCache.remove(ledgerToRemove).close(true);
-            }
-        }
+        return indexPersistenceManager.readMasterKey(ledgerId);
     }
 
     @Override
     public boolean setFenced(long ledgerId) throws IOException {
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(ledgerId, null);
-            return fi.setFenced();
-        } finally {
-            if (null != fi) {
-                fi.release();
-            }
-        }
+        return indexPersistenceManager.setFenced(ledgerId);
     }
 
     @Override
     public boolean isFenced(long ledgerId) throws IOException {
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(ledgerId, null);
-            return fi.isFenced();
-        } finally {
-            if (null != fi) {
-                fi.release();
-            }
-        }
+        return indexPersistenceManager.isFenced(ledgerId);
     }
 
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(ledgerId, masterKey);
-        } finally {
-            if (null != fi) {
-                fi.release();
-            }
-        }
+        indexPersistenceManager.setMasterKey(ledgerId, masterKey);
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
-        synchronized(fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledgerId);
-            if (fi == null) {
-                File lf = findIndexFile(ledgerId);
-                if (lf == null) {
-                    return false;
-                }
-            }
-        }
-        return true;
+        return indexPersistenceManager.ledgerExists(ledgerId);
     }
 
     @Override
@@ -876,7 +143,7 @@ public class LedgerCacheImpl implements 
 
             @Override
             public int getPageCount() {
-                return LedgerCacheImpl.this.getNumUsedPages();
+                return LedgerCacheImpl.this.indexPageManager.getNumUsedPages();
             }
 
             @Override
@@ -886,41 +153,33 @@ public class LedgerCacheImpl implements 
 
             @Override
             public int getOpenFileLimit() {
-                return openFileLimit;
+                return LedgerCacheImpl.this.indexPersistenceManager.getOpenFileLimit();
             }
 
             @Override
             public int getPageLimit() {
-                return LedgerCacheImpl.this.getPageLimit();
+                return LedgerCacheImpl.this.indexPageManager.getPageLimit();
             }
 
             @Override
             public int getNumCleanLedgers() {
-                return cleanLedgers.size();
+                return LedgerCacheImpl.this.indexPageManager.getNumCleanLedgers();
             }
 
             @Override
             public int getNumDirtyLedgers() {
-                return dirtyLedgers.size();
+                return LedgerCacheImpl.this.indexPageManager.getNumDirtyLedgers();
             }
 
             @Override
             public int getNumOpenLedgers() {
-                return openLedgers.size();
+                return LedgerCacheImpl.this.indexPersistenceManager.getNumOpenLedgers();
             }
         };
     }
 
     @Override
     public void close() throws IOException {
-        synchronized (fileInfoCache) {
-            for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
-                FileInfo value = fileInfo.getValue();
-                if (value != null) {
-                    value.close(true);
-                }
-            }
-            fileInfoCache.clear();
-        }
+        indexPersistenceManager.close();
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Fri Oct 11 06:36:27 2013
@@ -21,35 +21,24 @@ package org.apache.bookkeeper.bookie;
  *
  */
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
 import java.io.File;
-import java.io.RandomAccessFile;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Random;
-import java.util.Set;
 import java.util.Arrays;
+import java.util.Random;
 
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeperTestClient;
-import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
 
 public class BookieJournalTest {
     static Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
@@ -59,7 +48,7 @@ public class BookieJournalTest {
     private void writeIndexFileForLedger(File indexDir, long ledgerId,
                                          byte[] masterKey)
             throws Exception {
-        File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
+        File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file
@@ -70,7 +59,7 @@ public class BookieJournalTest {
     private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
                                                 byte[] masterKey, boolean truncateToMasterKey)
             throws Exception {
-        File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
+        File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Fri Oct 11 06:36:27 2013
@@ -25,6 +25,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import junit.framework.TestCase;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -38,12 +40,9 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import junit.framework.TestCase;
-
 /**
  * LedgerCache related test cases
  */
@@ -266,11 +265,11 @@ public class LedgerCacheTest extends Tes
         // Create ledger index file
         ledgerStorage.setMasterKey(1, "key".getBytes());
 
-        FileInfo fileInfo = ledgerCache.getFileInfo(Long.valueOf(1), null);
+        FileInfo fileInfo = ledgerCache.getIndexPersistenceManager().getFileInfo(Long.valueOf(1), null);
 
         // Simulate the flush failure
         FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
-        ledgerCache.fileInfoCache.put(Long.valueOf(1), newFileInfo);
+        ledgerCache.getIndexPersistenceManager().fileInfoCache.put(Long.valueOf(1), newFileInfo);
         // Add entries
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(1, 2));
@@ -364,7 +363,7 @@ public class LedgerCacheTest extends Tes
     public void testSyncThreadNPE() throws IOException {
         newLedgerCache();
         try {
-            ((LedgerCacheImpl) ledgerCache).getLedgerEntryPage(0L, 0L, true);
+            ((LedgerCacheImpl) ledgerCache).getIndexPageManager().getLedgerEntryPage(0L, 0L, true);
         } catch (Exception e) {
             LOG.error("Exception when trying to get a ledger entry page", e);
             fail("Shouldn't have thrown an exception");

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Fri Oct 11 06:36:27 2013
@@ -21,33 +21,28 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.util.Arrays;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.BufferedWriter;
 import java.io.File;
-import java.io.IOException;
-
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
-import java.io.BufferedWriter;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
-
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
-import static org.junit.Assert.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
 
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.bookkeeper.test.PortManager;
-
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +70,7 @@ public class UpgradeTest {
             throws Exception {
         long ledgerId = 1;
 
-        File fn = new File(dir, LedgerCacheImpl.getLedgerName(ledgerId));
+        File fn = new File(dir, IndexPersistenceMgr.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java Fri Oct 11 06:36:27 2013
@@ -20,41 +20,37 @@
  */
 package org.apache.bookkeeper.replication;
 
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.CountDownLatch;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.LinkedList;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieAccessor;
-import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
-
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.LedgerCacheImpl;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,7 +187,7 @@ public class AuditorPeriodicCheckTest ex
         ledgerDir = Bookie.getCurrentDirectory(ledgerDir);
 
         // corrupt of entryLogs
-        File index = new File(ledgerDir, LedgerCacheImpl.getLedgerName(ledgerToCorrupt));
+        File index = new File(ledgerDir, IndexPersistenceMgr.getLedgerName(ledgerToCorrupt));
         LOG.info("file to corrupt{}" , index);
         ByteBuffer junk = ByteBuffer.allocate(1024*1024);
         FileOutputStream out = new FileOutputStream(index);