You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/10/11 08:36:27 UTC
svn commit: r1531203 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/
Author: sijie
Date: Fri Oct 11 06:36:27 2013
New Revision: 1531203
URL: http://svn.apache.org/r1531203
Log:
BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 11 06:36:27 2013
@@ -162,6 +162,8 @@ Trunk (unreleased changes)
BOOKKEEPER-645: Bookkeeper shell command to get a list of readonly bookies (rakesh via sijie)
+ BOOKKEEPER-658: ledger cache refactor (Robin Dhamankar via sijie)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Fri Oct 11 06:36:27 2013
@@ -745,7 +745,7 @@ public class BookieShell implements Tool
* @return file object.
*/
private File getLedgerFile(long ledgerId) {
- String ledgerName = LedgerCacheImpl.getLedgerName(ledgerId);
+ String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId);
File lf = null;
for (File d : ledgerDirectories) {
lf = new File(d, ledgerName);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Fri Oct 11 06:36:27 2013
@@ -21,19 +21,20 @@
package org.apache.bookkeeper.bookie;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.common.annotations.VisibleForTesting;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This is the file handle for a ledger's index file that maps entry ids to location.
* It is used by LedgerCache.
@@ -291,7 +292,7 @@ class FileInfo {
if (size > fc.size()) {
size = fc.size();
}
- File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+ File rlocFile = new File(newFile.getParentFile(), newFile.getName() + IndexPersistenceMgr.RLOC);
if (!rlocFile.exists()) {
checkParents(rlocFile);
if (!rlocFile.createNewFile()) {
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java?rev=1531203&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java Fri Oct 11 06:36:27 2013
@@ -0,0 +1,404 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IndexInMemPageMgr {
+ private final static Logger LOG = LoggerFactory.getLogger(IndexInMemPageMgr.class);
+
+ final int pageSize;
+ final int pageLimit;
+ final int entriesPerPage;
+ final HashMap<Long, HashMap<Long, LedgerEntryPage>> pages;
+
+ // The number of pages that have actually been used
+ private int pageCount = 0;
+
+ // The persistence manager that this page manager uses to
+ // flush and read pages
+ private final IndexPersistenceMgr indexPersistenceManager;
+
+ /**
+ * the list of potentially dirty ledgers
+ */
+ LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
+ /**
+ * the list of potentially clean ledgers
+ */
+ LinkedList<Long> cleanLedgers = new LinkedList<Long>();
+
+ public IndexInMemPageMgr(int pageSize,
+ int entriesPerPage,
+ ServerConfiguration conf,
+ IndexPersistenceMgr indexPersistenceManager) {
+ this.pageSize = pageSize;
+ this.entriesPerPage = entriesPerPage;
+ this.indexPersistenceManager = indexPersistenceManager;
+ this.pages = new HashMap<Long, HashMap<Long, LedgerEntryPage>>();
+
+ if (conf.getPageLimit() <= 0) {
+ // allocate half of the memory to the page cache
+ this.pageLimit = (int) ((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
+ } else {
+ this.pageLimit = conf.getPageLimit();
+ }
+ LOG.info("maxMemory = {}, pageSize = {}, pageLimit = {}", new Object[] { Runtime.getRuntime().maxMemory(),
+ pageSize, pageLimit });
+ }
+
+ /**
+ * @return page size used in ledger cache
+ */
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return entries per page used in ledger cache
+ */
+ public int getEntriesPerPage() {
+ return entriesPerPage;
+ }
+
+ /**
+ * @return page limitation in ledger cache
+ */
+ public int getPageLimit() {
+ return pageLimit;
+ }
+
+ /**
+ * @return number of page used in ledger cache
+ */
+ public int getNumUsedPages() {
+ return pageCount;
+ }
+
+ public int getNumCleanLedgers() {
+ return cleanLedgers.size();
+ }
+
+ public int getNumDirtyLedgers() {
+ return dirtyLedgers.size();
+ }
+
+ private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
+ HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
+ if (map == null) {
+ map = new HashMap<Long, LedgerEntryPage>();
+ table.put(lep.getLedger(), map);
+ }
+ map.put(lep.getFirstEntry(), lep);
+ }
+
+ private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
+ Long ledger, Long firstEntry) {
+ HashMap<Long, LedgerEntryPage> map = table.get(ledger);
+ if (map != null) {
+ return map.get(firstEntry);
+ }
+ return null;
+ }
+
+ synchronized protected LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+ LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
+ if (lep == null) {
+ return null;
+ }
+
+ lep.usePage();
+
+ if (onlyDirty && lep.isClean()) {
+ return null;
+ } else {
+ return lep;
+ }
+ }
+
+ /**
+ * Grab ledger entry page whose first entry is <code>pageEntry</code>.
+ *
+ * If the page doesn't existed before, we allocate a memory page.
+ * Otherwise, we grab a clean page and read it from disk.
+ *
+ * @param ledger
+ * Ledger Id
+ * @param pageEntry
+ * Start entry of this entry page.
+ */
+ private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws IOException {
+ LedgerEntryPage lep = grabCleanPage(ledger, pageEntry);
+ try {
+ // should update page before we put it into table
+ // otherwise we would put an empty page in it
+ indexPersistenceManager.updatePage(lep);
+ synchronized (this) {
+ putIntoTable(pages, lep);
+ }
+ } catch (IOException ie) {
+ // if we grab a clean page, but failed to update the page
+ // we are exhausting the count of ledger entry pages.
+ // since this page will be never used, so we need to decrement
+ // page count of ledger cache.
+ lep.releasePage();
+ synchronized (this) {
+ --pageCount;
+ }
+ throw ie;
+ }
+ return lep;
+ }
+
+ void removePagesForLedger(long ledgerId) {
+ // remove pages first to avoid page flushed when deleting file info
+ synchronized (this) {
+ Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
+ if (null != lpages) {
+ pageCount -= lpages.size();
+ if (pageCount < 0) {
+ LOG.error("Page count of ledger cache has been decremented to be less than zero.");
+ }
+ }
+ }
+ }
+
+ long getLastEntryInMem(long ledgerId) {
+ long lastEntry = 0;
+ // Find the last entry in the cache
+ synchronized (this) {
+ Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
+ if (map != null) {
+ for (LedgerEntryPage lep : map.values()) {
+ if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
+ continue;
+ }
+ lep.usePage();
+ long highest = lep.getLastEntry();
+ if (highest > lastEntry) {
+ lastEntry = highest;
+ }
+ lep.releasePage();
+ }
+ }
+ }
+ return lastEntry;
+ }
+
+ private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
+ if (entry % entriesPerPage != 0) {
+ throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
+ }
+ outerLoop: while (true) {
+ synchronized (this) {
+ if (pageCount < pageLimit) {
+ // let's see if we can allocate something
+ LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+ lep.setLedger(ledger);
+ lep.setFirstEntry(entry);
+
+ // note, this will not block since it is a new page
+ lep.usePage();
+ pageCount++;
+ return lep;
+ }
+ }
+
+ synchronized (cleanLedgers) {
+ if (cleanLedgers.isEmpty()) {
+ flushOneOrMoreLedgers(false);
+ synchronized (this) {
+ for (Long l : pages.keySet()) {
+ cleanLedgers.add(l);
+ }
+ }
+ }
+ synchronized (this) {
+ // if ledgers deleted between checking pageCount and putting
+ // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
+ // so give it a chance to go back to check pageCount again because
+ // deleteLedger would decrement pageCount to return the number of pages
+ // occupied by deleted ledgers.
+ if (cleanLedgers.isEmpty()) {
+ continue outerLoop;
+ }
+ Long cleanLedger = cleanLedgers.getFirst();
+ Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
+ while (map == null || map.isEmpty()) {
+ cleanLedgers.removeFirst();
+ if (cleanLedgers.isEmpty()) {
+ continue outerLoop;
+ }
+ cleanLedger = cleanLedgers.getFirst();
+ map = pages.get(cleanLedger);
+ }
+ Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
+ LedgerEntryPage lep = it.next().getValue();
+ while ((lep.inUse() || !lep.isClean())) {
+ if (!it.hasNext()) {
+ // no clean page found in this ledger
+ cleanLedgers.removeFirst();
+ continue outerLoop;
+ }
+ lep = it.next().getValue();
+ }
+ it.remove();
+ if (map.isEmpty()) {
+ pages.remove(lep.getLedger());
+ }
+ lep.usePage();
+ lep.zeroPage();
+ lep.setLedger(ledger);
+ lep.setFirstEntry(entry);
+ return lep;
+ }
+ }
+ }
+ }
+
+ void flushOneOrMoreLedgers(boolean doAll) throws IOException {
+ synchronized (dirtyLedgers) {
+ if (dirtyLedgers.isEmpty()) {
+ synchronized (this) {
+ for (Long l : pages.keySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding {} to dirty pages", Long.toHexString(l));
+ }
+ dirtyLedgers.add(l);
+ }
+ }
+ }
+ if (dirtyLedgers.isEmpty()) {
+ return;
+ }
+
+ indexPersistenceManager.relocateIndexFileIfDirFull(dirtyLedgers);
+
+ while (!dirtyLedgers.isEmpty()) {
+ Long l = dirtyLedgers.removeFirst();
+
+ flushSpecificLedger(l);
+
+ if (!doAll) {
+ break;
+ }
+ // Yield. if we are doing all the ledgers we don't want to block other flushes that
+ // need to happen
+ try {
+ dirtyLedgers.wait(1);
+ } catch (InterruptedException e) {
+ // just pass it on
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /**
+ * Flush a specified ledger
+ *
+ * @param l
+ * Ledger Id
+ * @throws IOException
+ */
+ private void flushSpecificLedger(long l) throws IOException {
+ LinkedList<Long> firstEntryList;
+ synchronized(this) {
+ HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
+ if (pageMap == null || pageMap.isEmpty()) {
+ indexPersistenceManager.flushLedgerHeader(l);
+ return;
+ }
+ firstEntryList = new LinkedList<Long>();
+ for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+ LedgerEntryPage lep = entry.getValue();
+ if (lep.isClean()) {
+ LOG.trace("Page is clean {}", lep);
+ continue;
+ }
+ firstEntryList.add(lep.getFirstEntry());
+ }
+ }
+
+ if (firstEntryList.size() == 0) {
+ LOG.debug("Nothing to flush for ledger {}.", l);
+ // nothing to do
+ return;
+ }
+
+ // Now flush all the pages of a ledger
+ List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
+ try {
+ for(Long firstEntry: firstEntryList) {
+ LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+ if (lep != null) {
+ entries.add(lep);
+ }
+ }
+ indexPersistenceManager.flushLedgerEntries(l, entries);
+ } finally {
+ for(LedgerEntryPage lep: entries) {
+ lep.releasePage();
+ }
+ }
+ }
+
+ void putEntryOffset(long ledger, long entry, long offset) throws IOException {
+ int offsetInPage = (int) (entry % entriesPerPage);
+ // find the id of the first entry of the page that has the entry
+ // we are looking for
+ long pageEntry = entry - offsetInPage;
+ LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+ if (lep == null) {
+ lep = grabLedgerEntryPage(ledger, pageEntry);
+ }
+ lep.setOffset(offset, offsetInPage * 8);
+ lep.releasePage();
+ }
+
+ long getEntryOffset(long ledger, long entry) throws IOException {
+ int offsetInPage = (int) (entry % entriesPerPage);
+ // find the id of the first entry of the page that has the entry
+ // we are looking for
+ long pageEntry = entry - offsetInPage;
+ LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+ try {
+ if (lep == null) {
+ lep = grabLedgerEntryPage(ledger, pageEntry);
+ }
+ return lep.getOffset(offsetInPage * 8);
+ } finally {
+ if (lep != null) {
+ lep.releasePage();
+ }
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1531203&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Fri Oct 11 06:36:27 2013
@@ -0,0 +1,547 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class IndexPersistenceMgr {
+ private final static Logger LOG = LoggerFactory.getLogger(IndexPersistenceMgr.class);
+
+ private static final String IDX = ".idx";
+ static final String RLOC = ".rloc";
+
+ @VisibleForTesting
+ public static final String getLedgerName(long ledgerId) {
+ int parent = (int) (ledgerId & 0xff);
+ int grandParent = (int) ((ledgerId & 0xff00) >> 8);
+ StringBuilder sb = new StringBuilder();
+ sb.append(Integer.toHexString(grandParent));
+ sb.append('/');
+ sb.append(Integer.toHexString(parent));
+ sb.append('/');
+ sb.append(Long.toHexString(ledgerId));
+ sb.append(IDX);
+ return sb.toString();
+ }
+
+ final HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
+ final int openFileLimit;
+ final int pageSize;
+ final int entriesPerPage;
+
+ // Manage all active ledgers in LedgerManager
+ // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
+ final SnapshotMap<Long, Boolean> activeLedgers;
+ private LedgerDirsManager ledgerDirsManager;
+ final LinkedList<Long> openLedgers = new LinkedList<Long>();
+ final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
+
+ public IndexPersistenceMgr(int pageSize,
+ int entriesPerPage,
+ ServerConfiguration conf,
+ SnapshotMap<Long, Boolean> activeLedgers,
+ LedgerDirsManager ledgerDirsManager) throws IOException {
+ this.openFileLimit = conf.getOpenFileLimit();
+ this.activeLedgers = activeLedgers;
+ this.ledgerDirsManager = ledgerDirsManager;
+ this.pageSize = pageSize;
+ this.entriesPerPage = entriesPerPage;
+ LOG.info("openFileLimit = {}", openFileLimit);
+ // Retrieve all of the active ledgers.
+ getActiveLedgers();
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+ }
+
+ FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
+ synchronized (fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledger);
+ if (fi == null) {
+ File lf = findIndexFile(ledger);
+ if (lf == null) {
+ if (masterKey == null) {
+ throw new Bookie.NoLedgerException(ledger);
+ }
+ lf = getNewLedgerIndexFile(ledger, null);
+ // A new ledger index file has been created for this Bookie.
+ // Add this new ledger to the set of active ledgers.
+ LOG.debug("New ledger index file created for ledgerId: {}", ledger);
+ activeLedgers.put(ledger, true);
+ }
+ evictFileInfoIfNecessary();
+ fi = new FileInfo(lf, masterKey);
+ fileInfoCache.put(ledger, fi);
+ openLedgers.add(ledger);
+ }
+ if (fi != null) {
+ fi.use();
+ }
+ return fi;
+ }
+ }
+
+ /**
+ * Get a new index file for ledger excluding directory <code>excludedDir</code>.
+ *
+ * @param ledger
+ * Ledger id.
+ * @param excludedDir
+ * The ledger directory to exclude.
+ * @return new index file object.
+ * @throws NoWritableLedgerDirException if there is no writable dir available.
+ */
+ private File getNewLedgerIndexFile(Long ledger, File excludedDir)
+ throws NoWritableLedgerDirException {
+ File dir = ledgerDirsManager.pickRandomWritableDir(excludedDir);
+ String ledgerName = getLedgerName(ledger);
+ return new File(dir, ledgerName);
+ }
+
+ /**
+ * This method will look within the ledger directories for the ledger index
+ * files. That will comprise the set of active ledgers this particular
+ * BookieServer knows about that have not yet been deleted by the BookKeeper
+ * Client. This is called only once during initialization.
+ */
+ private void getActiveLedgers() throws IOException {
+ // Ledger index files are stored in a file hierarchy with a parent and
+ // grandParent directory. We'll have to go two levels deep into these
+ // directories to find the index files.
+ for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
+ for (File grandParent : ledgerDirectory.listFiles()) {
+ if (grandParent.isDirectory()) {
+ for (File parent : grandParent.listFiles()) {
+ if (parent.isDirectory()) {
+ for (File index : parent.listFiles()) {
+ if (!index.isFile()
+ || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
+ continue;
+ }
+
+ // We've found a ledger index file. The file
+ // name is the HexString representation of the
+ // ledgerId.
+ String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
+ if (index.getName().endsWith(RLOC)) {
+ if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
+ if (!index.delete()) {
+ LOG.warn("Deleting the rloc file " + index + " failed");
+ }
+ continue;
+ } else {
+ File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
+ if (!index.renameTo(dest)) {
+ throw new IOException("Renaming rloc file " + index
+ + " to index file has failed");
+ }
+ }
+ }
+ activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This method is called whenever a ledger is deleted by the BookKeeper Client
+ * and we want to remove all relevant data for it stored in the LedgerCache.
+ */
+ void removeLedger(long ledgerId) throws IOException {
+ // Delete the ledger's index file and close the FileInfo
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ fi.close(false);
+ fi.delete();
+ } finally {
+ // should release use count
+ // otherwise the file channel would not be closed.
+ if (null != fi) {
+ fi.release();
+ }
+ }
+
+ // Remove it from the active ledger manager
+ activeLedgers.remove(ledgerId);
+
+ // Now remove it from all the other lists and maps.
+ // These data structures need to be synchronized first before removing entries.
+ synchronized (fileInfoCache) {
+ fileInfoCache.remove(ledgerId);
+ }
+ synchronized (openLedgers) {
+ openLedgers.remove(ledgerId);
+ }
+ }
+
+ private File findIndexFile(long ledgerId) throws IOException {
+ String ledgerName = getLedgerName(ledgerId);
+ for (File d : ledgerDirsManager.getAllLedgerDirs()) {
+ File lf = new File(d, ledgerName);
+ if (lf.exists()) {
+ return lf;
+ }
+ }
+ return null;
+ }
+
+ boolean ledgerExists(long ledgerId) throws IOException {
+ synchronized (fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledgerId);
+ if (fi == null) {
+ File lf = findIndexFile(ledgerId);
+ if (lf == null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ int getNumOpenLedgers() {
+ return openLedgers.size();
+ }
+
+ // evict file info if necessary
+ private void evictFileInfoIfNecessary() throws IOException {
+ synchronized (fileInfoCache) {
+ if (openLedgers.size() > openFileLimit) {
+ long ledgerToRemove = openLedgers.removeFirst();
+ // TODO Add a statistic here, we don't care really which
+ // ledger is evicted, but the rate at which they get evicted
+ fileInfoCache.remove(ledgerToRemove).close(true);
+ }
+ }
+ }
+
+ void close() throws IOException {
+ synchronized (fileInfoCache) {
+ for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
+ FileInfo value = fileInfo.getValue();
+ if (value != null) {
+ value.close(true);
+ }
+ }
+ fileInfoCache.clear();
+ }
+ }
+
+ byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+ synchronized (fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledgerId);
+ if (fi == null) {
+ File lf = findIndexFile(ledgerId);
+ if (lf == null) {
+ throw new Bookie.NoLedgerException(ledgerId);
+ }
+ evictFileInfoIfNecessary();
+ fi = new FileInfo(lf, null);
+ byte[] key = fi.getMasterKey();
+ fileInfoCache.put(ledgerId, fi);
+ openLedgers.add(ledgerId);
+ return key;
+ }
+ return fi.getMasterKey();
+ }
+ }
+
+ void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, masterKey);
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
+
+ boolean setFenced(long ledgerId) throws IOException {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ return fi.setFenced();
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
+
+ boolean isFenced(long ledgerId) throws IOException {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ return fi.isFenced();
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
+
+ int getOpenFileLimit() {
+ return openFileLimit;
+ }
+
+ private LedgerDirsListener getLedgerDirsListener() {
+ return new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ // If the current entry log disk is full, then create new entry
+ // log.
+ shouldRelocateIndexFile.set(true);
+ }
+
+ @Override
+ public void diskFailed(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void allDisksFull() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void fatalError() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+ };
+ }
+
+ void relocateIndexFileIfDirFull(Collection<Long> dirtyLedgers) throws IOException {
+ if (shouldRelocateIndexFile.get()) {
+ // if some new dir detected as full, then move all corresponding
+ // open index files to new location
+ for (Long l : dirtyLedgers) {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(l, null);
+ File currentDir = getLedgerDirForLedger(fi);
+ if (ledgerDirsManager.isDirFull(currentDir)) {
+ moveLedgerIndexFile(l, fi);
+ }
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
+ shouldRelocateIndexFile.set(false);
+ }
+ }
+
+ /**
+ * Get the ledger directory that the ledger index belongs to.
+ *
+ * @param fi File info of a ledger
+ * @return ledger directory that the ledger belongs to.
+ */
+ private File getLedgerDirForLedger(FileInfo fi) {
+ return fi.getLf().getParentFile().getParentFile().getParentFile();
+ }
+
+ private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
+ File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
+ fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+ }
+
+ void flushLedgerHeader(long ledger) throws IOException {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledger, null);
+ fi.flushHeader();
+ } catch (Bookie.NoLedgerException nle) {
+ // ledger has been deleted
+ return;
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ return;
+ }
+
+ void flushLedgerEntries(long l, List<LedgerEntryPage> entries) throws IOException {
+ FileInfo fi = null;
+ try {
+ Collections.sort(entries, new Comparator<LedgerEntryPage>() {
+ @Override
+ public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
+ return (int) (o1.getFirstEntry() - o2.getFirstEntry());
+ }
+ });
+ ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+ try {
+ fi = getFileInfo(l, null);
+ } catch (Bookie.NoLedgerException nle) {
+ // ledger has been deleted
+ return;
+ }
+
+ // flush the header if necessary
+ fi.flushHeader();
+ int start = 0;
+ long lastOffset = -1;
+ for (int i = 0; i < entries.size(); i++) {
+ versions.add(i, entries.get(i).getVersion());
+ if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
+ // send up a sequential list
+ int count = i - start;
+ if (count == 0) {
+ LOG.warn("Count cannot possibly be zero!");
+ }
+ writeBuffers(l, entries, fi, start, count);
+ start = i;
+ }
+ lastOffset = entries.get(i).getFirstEntry();
+ }
+ if (entries.size() - start == 0 && entries.size() != 0) {
+ LOG.warn("Nothing to write, but there were entries!");
+ }
+ writeBuffers(l, entries, fi, start, entries.size() - start);
+ synchronized (this) {
+ for (int i = 0; i < entries.size(); i++) {
+ LedgerEntryPage lep = entries.get(i);
+ lep.setClean(versions.get(i));
+ }
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ }
+
+ private void writeBuffers(Long ledger,
+ List<LedgerEntryPage> entries, FileInfo fi,
+ int start, int count) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
+ }
+ if (count == 0) {
+ return;
+ }
+ ByteBuffer buffs[] = new ByteBuffer[count];
+ for (int j = 0; j < count; j++) {
+ buffs[j] = entries.get(start + j).getPageToWrite();
+ if (entries.get(start + j).getLedger() != ledger) {
+ throw new IOException("Writing to " + ledger + " but page belongs to "
+ + entries.get(start + j).getLedger());
+ }
+ }
+ long totalWritten = 0;
+ while (buffs[buffs.length - 1].remaining() > 0) {
+ long rc = fi.write(buffs, entries.get(start + 0).getFirstEntry() * 8);
+ if (rc <= 0) {
+ throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
+ }
+ totalWritten += rc;
+ }
+ if (totalWritten != (long) count * (long) pageSize) {
+ throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+ + " expected " + count * pageSize);
+ }
+ }
+
+ void updatePage(LedgerEntryPage lep) throws IOException {
+ if (!lep.isClean()) {
+ throw new IOException("Trying to update a dirty page");
+ }
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(lep.getLedger(), null);
+ long pos = lep.getFirstEntry() * 8;
+ if (pos >= fi.size()) {
+ lep.zeroPage();
+ } else {
+ lep.readPage(fi);
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ }
+
+ long getPersistEntryBeyondInMem(long ledgerId, long lastEntryInMem) throws IOException {
+ FileInfo fi = null;
+ long lastEntry = lastEntryInMem;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ long size = fi.size();
+ // make sure the file size is aligned with index entry size
+ // otherwise we may read incorret data
+ if (0 != size % 8) {
+ LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
+ size = size - size % 8;
+ }
+ // we may not have the last entry in the cache
+ if (size > lastEntry * 8) {
+ ByteBuffer bb = ByteBuffer.allocate(pageSize);
+ long position = size - pageSize;
+ if (position < 0) {
+ position = 0;
+ }
+ fi.read(bb, position);
+ bb.flip();
+ long startingEntryId = position / 8;
+ for (int i = entriesPerPage - 1; i >= 0; i--) {
+ if (bb.getLong(i * 8) != 0) {
+ if (lastEntry < startingEntryId + i) {
+ lastEntry = startingEntryId + i;
+ }
+ break;
+ }
+ }
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ return lastEntry;
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Fri Oct 11 06:36:27 2013
@@ -21,26 +21,10 @@
package org.apache.bookkeeper.bookie;
-import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,55 +34,28 @@ import org.slf4j.LoggerFactory;
*/
public class LedgerCacheImpl implements LedgerCache {
private final static Logger LOG = LoggerFactory.getLogger(LedgerCacheImpl.class);
- private static final String IDX = ".idx";
- static final String RLOC = ".rloc";
- private LedgerDirsManager ledgerDirsManager;
- final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
+ private final IndexInMemPageMgr indexPageManager;
+ private final IndexPersistenceMgr indexPersistenceManager;
+ private final int pageSize;
+ private final int entriesPerPage;
public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
- LedgerDirsManager ledgerDirsManager)
- throws IOException {
- this.ledgerDirsManager = ledgerDirsManager;
- this.openFileLimit = conf.getOpenFileLimit();
+ LedgerDirsManager ledgerDirsManager) throws IOException {
this.pageSize = conf.getPageSize();
this.entriesPerPage = pageSize / 8;
-
- if (conf.getPageLimit() <= 0) {
- // allocate half of the memory to the page cache
- this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
- } else {
- this.pageLimit = conf.getPageLimit();
- }
- LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
- LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
- this.activeLedgers = activeLedgers;
- // Retrieve all of the active ledgers.
- getActiveLedgers();
- ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+ this.indexPersistenceManager = new IndexPersistenceMgr(pageSize, entriesPerPage, conf, activeLedgers,
+ ledgerDirsManager);
+ this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf, indexPersistenceManager);
}
- /**
- * the list of potentially clean ledgers
- */
- LinkedList<Long> cleanLedgers = new LinkedList<Long>();
- /**
- * the list of potentially dirty ledgers
- */
- LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
-
- HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
-
- LinkedList<Long> openLedgers = new LinkedList<Long>();
-
- // Manage all active ledgers in LedgerManager
- // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
- final SnapshotMap<Long, Boolean> activeLedgers;
+ IndexPersistenceMgr getIndexPersistenceManager() {
+ return indexPersistenceManager;
+ }
- final int openFileLimit;
- final int pageSize;
- final int pageLimit;
- final int entriesPerPage;
+ IndexInMemPageMgr getIndexPageManager() {
+ return indexPageManager;
+ }
/**
* @return page size used in ledger cache
@@ -107,613 +64,34 @@ public class LedgerCacheImpl implements
return pageSize;
}
- /**
- * @return entries per page used in ledger cache
- */
- public int getEntriesPerPage() {
- return entriesPerPage;
- }
-
- /**
- * @return page limitation in ledger cache
- */
- public int getPageLimit() {
- return pageLimit;
- }
-
- // The number of pages that have actually been used
- private int pageCount = 0;
- HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
-
- /**
- * @return number of page used in ledger cache
- */
- public int getNumUsedPages() {
- return pageCount;
- }
-
- private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
- HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
- if (map == null) {
- map = new HashMap<Long, LedgerEntryPage>();
- table.put(lep.getLedger(), map);
- }
- map.put(lep.getFirstEntry(), lep);
- }
-
- private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
- Long ledger, Long firstEntry) {
- HashMap<Long, LedgerEntryPage> map = table.get(ledger);
- if (map != null) {
- return map.get(firstEntry);
- }
- return null;
- }
-
- synchronized protected LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
- LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
- if (lep == null) {
- return null;
- }
-
- lep.usePage();
-
- if (onlyDirty && lep.isClean()) {
- return null;
- } else {
- return lep;
- }
- }
-
- /**
- * Grab ledger entry page whose first entry is <code>pageEntry</code>.
- *
- * If the page doesn't existed before, we allocate a memory page.
- * Otherwise, we grab a clean page and read it from disk.
- *
- * @param ledger
- * Ledger Id
- * @param pageEntry
- * Start entry of this entry page.
- */
- private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws IOException {
- LedgerEntryPage lep = grabCleanPage(ledger, pageEntry);
- try {
- // should update page before we put it into table
- // otherwise we would put an empty page in it
- updatePage(lep);
- synchronized(this) {
- putIntoTable(pages, lep);
- }
- } catch (IOException ie) {
- // if we grab a clean page, but failed to update the page
- // we are exhausting the count of ledger entry pages.
- // since this page will be never used, so we need to decrement
- // page count of ledger cache.
- lep.releasePage();
- synchronized (this) {
- --pageCount;
- }
- throw ie;
- }
- return lep;
- }
-
@Override
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
- int offsetInPage = (int) (entry % entriesPerPage);
- // find the id of the first entry of the page that has the entry
- // we are looking for
- long pageEntry = entry-offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
- if (lep == null) {
- lep = grabLedgerEntryPage(ledger, pageEntry);
- }
- lep.setOffset(offset, offsetInPage*8);
- lep.releasePage();
+ indexPageManager.putEntryOffset(ledger, entry, offset);
}
@Override
public long getEntryOffset(long ledger, long entry) throws IOException {
- int offsetInPage = (int) (entry%entriesPerPage);
- // find the id of the first entry of the page that has the entry
- // we are looking for
- long pageEntry = entry-offsetInPage;
- LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
- try {
- if (lep == null) {
- lep = grabLedgerEntryPage(ledger, pageEntry);
- }
- return lep.getOffset(offsetInPage*8);
- } finally {
- if (lep != null) {
- lep.releasePage();
- }
- }
- }
-
- @VisibleForTesting
- public static final String getLedgerName(long ledgerId) {
- int parent = (int) (ledgerId & 0xff);
- int grandParent = (int) ((ledgerId & 0xff00) >> 8);
- StringBuilder sb = new StringBuilder();
- sb.append(Integer.toHexString(grandParent));
- sb.append('/');
- sb.append(Integer.toHexString(parent));
- sb.append('/');
- sb.append(Long.toHexString(ledgerId));
- sb.append(IDX);
- return sb.toString();
- }
-
- FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
- synchronized(fileInfoCache) {
- FileInfo fi = fileInfoCache.get(ledger);
- if (fi == null) {
- File lf = findIndexFile(ledger);
- if (lf == null) {
- if (masterKey == null) {
- throw new Bookie.NoLedgerException(ledger);
- }
- lf = getNewLedgerIndexFile(ledger, null);
- // A new ledger index file has been created for this Bookie.
- // Add this new ledger to the set of active ledgers.
- LOG.debug("New ledger index file created for ledgerId: {}", ledger);
- activeLedgers.put(ledger, true);
- }
- evictFileInfoIfNecessary();
- fi = new FileInfo(lf, masterKey);
- fileInfoCache.put(ledger, fi);
- openLedgers.add(ledger);
- }
- if (fi != null) {
- fi.use();
- }
- return fi;
- }
- }
-
- /**
- * Get a new index file for ledger excluding directory <code>excludedDir</code>.
- *
- * @param ledger
- * Ledger id.
- * @param excludedDir
- * The ledger directory to exclude.
- * @return new index file object.
- * @throws NoWritableLedgerDirException if there is no writable dir available.
- */
- private File getNewLedgerIndexFile(Long ledger, File excludedDir)
- throws NoWritableLedgerDirException {
- File dir = ledgerDirsManager.pickRandomWritableDir(excludedDir);
- String ledgerName = getLedgerName(ledger);
- return new File(dir, ledgerName);
- }
-
- private void updatePage(LedgerEntryPage lep) throws IOException {
- if (!lep.isClean()) {
- throw new IOException("Trying to update a dirty page");
- }
- FileInfo fi = null;
- try {
- fi = getFileInfo(lep.getLedger(), null);
- long pos = lep.getFirstEntry()*8;
- if (pos >= fi.size()) {
- lep.zeroPage();
- } else {
- lep.readPage(fi);
- }
- } finally {
- if (fi != null) {
- fi.release();
- }
- }
- }
-
- private LedgerDirsListener getLedgerDirsListener() {
- return new LedgerDirsListener() {
- @Override
- public void diskFull(File disk) {
- // If the current entry log disk is full, then create new entry
- // log.
- shouldRelocateIndexFile.set(true);
- }
-
- @Override
- public void diskFailed(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void allDisksFull() {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void fatalError() {
- // Nothing to handle here. Will be handled in Bookie
- }
- };
+ return indexPageManager.getEntryOffset(ledger, entry);
}
@Override
public void flushLedger(boolean doAll) throws IOException {
- synchronized(dirtyLedgers) {
- if (dirtyLedgers.isEmpty()) {
- synchronized(this) {
- for(Long l: pages.keySet()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Adding {} to dirty pages", Long.toHexString(l));
- }
- dirtyLedgers.add(l);
- }
- }
- }
- if (dirtyLedgers.isEmpty()) {
- return;
- }
-
- if (shouldRelocateIndexFile.get()) {
- // if some new dir detected as full, then move all corresponding
- // open index files to new location
- for (Long l : dirtyLedgers) {
- FileInfo fi = null;
- try {
- fi = getFileInfo(l, null);
- File currentDir = getLedgerDirForLedger(fi);
- if (ledgerDirsManager.isDirFull(currentDir)) {
- moveLedgerIndexFile(l, fi);
- }
- } finally {
- if (null != fi) {
- fi.release();
- }
- }
- }
- shouldRelocateIndexFile.set(false);
- }
-
- while(!dirtyLedgers.isEmpty()) {
- Long l = dirtyLedgers.removeFirst();
-
- flushLedger(l);
-
- if (!doAll) {
- break;
- }
- // Yield. if we are doing all the ledgers we don't want to block other flushes that
- // need to happen
- try {
- dirtyLedgers.wait(1);
- } catch (InterruptedException e) {
- // just pass it on
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- /**
- * Get the ledger directory that the ledger index belongs to.
- *
- * @param fi File info of a ledger
- * @return ledger directory that the ledger belongs to.
- */
- private File getLedgerDirForLedger(FileInfo fi) {
- return fi.getLf().getParentFile().getParentFile().getParentFile();
- }
-
- private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
- File newLedgerIndexFile = getNewLedgerIndexFile(l, getLedgerDirForLedger(fi));
- fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
- }
-
- /**
- * Flush a specified ledger
- *
- * @param l
- * Ledger Id
- * @throws IOException
- */
- private void flushLedger(long l) throws IOException {
- FileInfo fi = null;
- try {
- fi = getFileInfo(l, null);
- flushLedger(l, fi);
- } catch (Bookie.NoLedgerException nle) {
- // ledger has been deleted
- } finally {
- if (null != fi) {
- fi.release();
- }
- }
- }
-
- private void flushLedger(long l, FileInfo fi) throws IOException {
- LinkedList<Long> firstEntryList;
- synchronized(this) {
- HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
- if (pageMap == null || pageMap.isEmpty()) {
- fi.flushHeader();
- return;
- }
- firstEntryList = new LinkedList<Long>();
- for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
- LedgerEntryPage lep = entry.getValue();
- if (lep.isClean()) {
- LOG.trace("Page is clean {}", lep);
- continue;
- }
- firstEntryList.add(lep.getFirstEntry());
- }
- }
-
- if (firstEntryList.size() == 0) {
- LOG.debug("Nothing to flush for ledger {}.", l);
- // nothing to do
- return;
- }
-
- // Now flush all the pages of a ledger
- List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
- try {
- for(Long firstEntry: firstEntryList) {
- LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
- if (lep != null) {
- entries.add(lep);
- }
- }
- Collections.sort(entries, new Comparator<LedgerEntryPage>() {
- @Override
- public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
- return (int)(o1.getFirstEntry()-o2.getFirstEntry());
- }
- });
- ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
- // flush the header if necessary
- fi.flushHeader();
- int start = 0;
- long lastOffset = -1;
- for(int i = 0; i < entries.size(); i++) {
- versions.add(i, entries.get(i).getVersion());
- if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
- // send up a sequential list
- int count = i - start;
- if (count == 0) {
- LOG.warn("Count cannot possibly be zero!");
- }
- writeBuffers(l, entries, fi, start, count);
- start = i;
- }
- lastOffset = entries.get(i).getFirstEntry();
- }
- if (entries.size()-start == 0 && entries.size() != 0) {
- LOG.warn("Nothing to write, but there were entries!");
- }
- writeBuffers(l, entries, fi, start, entries.size()-start);
- synchronized(this) {
- for(int i = 0; i < entries.size(); i++) {
- LedgerEntryPage lep = entries.get(i);
- lep.setClean(versions.get(i));
- }
- }
- } finally {
- for(LedgerEntryPage lep: entries) {
- lep.releasePage();
- }
- }
- }
-
- private void writeBuffers(Long ledger,
- List<LedgerEntryPage> entries, FileInfo fi,
- int start, int count) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writing {} buffers of {}", count, Long.toHexString(ledger));
- }
- if (count == 0) {
- return;
- }
- ByteBuffer buffs[] = new ByteBuffer[count];
- for(int j = 0; j < count; j++) {
- buffs[j] = entries.get(start+j).getPageToWrite();
- if (entries.get(start+j).getLedger() != ledger) {
- throw new IOException("Writing to " + ledger + " but page belongs to "
- + entries.get(start+j).getLedger());
- }
- }
- long totalWritten = 0;
- while(buffs[buffs.length-1].remaining() > 0) {
- long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
- if (rc <= 0) {
- throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
- }
- totalWritten += rc;
- }
- if (totalWritten != (long)count * (long)pageSize) {
- throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
- + " expected " + count * pageSize);
- }
- }
- private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
- if (entry % entriesPerPage != 0) {
- throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
- }
- outerLoop:
- while(true) {
- synchronized(this) {
- if (pageCount < pageLimit) {
- // let's see if we can allocate something
- LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
- lep.setLedger(ledger);
- lep.setFirstEntry(entry);
-
- // note, this will not block since it is a new page
- lep.usePage();
- pageCount++;
- return lep;
- }
- }
-
- synchronized(cleanLedgers) {
- if (cleanLedgers.isEmpty()) {
- flushLedger(false);
- synchronized(this) {
- for(Long l: pages.keySet()) {
- cleanLedgers.add(l);
- }
- }
- }
- synchronized(this) {
- // if ledgers deleted between checking pageCount and putting
- // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
- // so give it a chance to go back to check pageCount again because
- // deleteLedger would decrement pageCount to return the number of pages
- // occupied by deleted ledgers.
- if (cleanLedgers.isEmpty()) {
- continue outerLoop;
- }
- Long cleanLedger = cleanLedgers.getFirst();
- Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
- while (map == null || map.isEmpty()) {
- cleanLedgers.removeFirst();
- if (cleanLedgers.isEmpty()) {
- continue outerLoop;
- }
- cleanLedger = cleanLedgers.getFirst();
- map = pages.get(cleanLedger);
- }
- Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
- LedgerEntryPage lep = it.next().getValue();
- while((lep.inUse() || !lep.isClean())) {
- if (!it.hasNext()) {
- // no clean page found in this ledger
- cleanLedgers.removeFirst();
- continue outerLoop;
- }
- lep = it.next().getValue();
- }
- it.remove();
- if (map.isEmpty()) {
- pages.remove(lep.getLedger());
- }
- lep.usePage();
- lep.zeroPage();
- lep.setLedger(ledger);
- lep.setFirstEntry(entry);
- return lep;
- }
- }
- }
+ indexPageManager.flushOneOrMoreLedgers(doAll);
}
@Override
public long getLastEntry(long ledgerId) throws IOException {
- long lastEntry = 0;
- // Find the last entry in the cache
- synchronized(this) {
- Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
- if (map != null) {
- for(LedgerEntryPage lep: map.values()) {
- if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
- continue;
- }
- lep.usePage();
- long highest = lep.getLastEntry();
- if (highest > lastEntry) {
- lastEntry = highest;
- }
- lep.releasePage();
- }
- }
- }
-
- FileInfo fi = null;
- try {
- fi = getFileInfo(ledgerId, null);
- long size = fi.size();
- // make sure the file size is aligned with index entry size
- // otherwise we may read incorret data
- if (0 != size % 8) {
- LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
- size = size - size % 8;
- }
- // we may not have the last entry in the cache
- if (size > lastEntry*8) {
- ByteBuffer bb = ByteBuffer.allocate(getPageSize());
- long position = size - getPageSize();
- if (position < 0) {
- position = 0;
- }
- fi.read(bb, position);
- bb.flip();
- long startingEntryId = position/8;
- for(int i = getEntriesPerPage()-1; i >= 0; i--) {
- if (bb.getLong(i*8) != 0) {
- if (lastEntry < startingEntryId+i) {
- lastEntry = startingEntryId+i;
- }
- break;
- }
- }
- }
- } finally {
- if (fi != null) {
- fi.release();
- }
- }
-
+ // Get the highest entry from the pages that are in memory
+ long lastEntryInMem = indexPageManager.getLastEntryInMem(ledgerId);
+ // Some index pages may have been evicted from memory, retrieve the last entry
+ // from the persistent store. We will check if there could be an entry beyond the
+ // last in mem entry and only then attempt to get the last persisted entry from the file
+ // The latter is just an optimization
+ long lastEntry = indexPersistenceManager.getPersistEntryBeyondInMem(ledgerId, lastEntryInMem);
return lastEntry;
}
/**
- * This method will look within the ledger directories for the ledger index
- * files. That will comprise the set of active ledgers this particular
- * BookieServer knows about that have not yet been deleted by the BookKeeper
- * Client. This is called only once during initialization.
- */
- private void getActiveLedgers() throws IOException {
- // Ledger index files are stored in a file hierarchy with a parent and
- // grandParent directory. We'll have to go two levels deep into these
- // directories to find the index files.
- for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
- for (File grandParent : ledgerDirectory.listFiles()) {
- if (grandParent.isDirectory()) {
- for (File parent : grandParent.listFiles()) {
- if (parent.isDirectory()) {
- for (File index : parent.listFiles()) {
- if (!index.isFile()
- || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
- continue;
- }
-
- // We've found a ledger index file. The file
- // name is the HexString representation of the
- // ledgerId.
- String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
- if (index.getName().endsWith(RLOC)) {
- if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
- if (!index.delete()) {
- LOG.warn("Deleting the rloc file " + index + " failed");
- }
- continue;
- } else {
- File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
- if (!index.renameTo(dest)) {
- throw new IOException("Renaming rloc file " + index
- + " to index file has failed");
- }
- }
- }
- activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
- }
- }
- }
- }
- }
- }
- }
-
- /**
* This method is called whenever a ledger is deleted by the BookKeeper Client
* and we want to remove all relevant data for it stored in the LedgerCache.
*/
@@ -721,144 +99,33 @@ public class LedgerCacheImpl implements
public void deleteLedger(long ledgerId) throws IOException {
LOG.debug("Deleting ledgerId: {}", ledgerId);
- // remove pages first to avoid page flushed when deleting file info
- synchronized(this) {
- Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
- if (null != lpages) {
- pageCount -= lpages.size();
- if (pageCount < 0) {
- LOG.error("Page count of ledger cache has been decremented to be less than zero.");
- }
- }
- }
- // Delete the ledger's index file and close the FileInfo
- FileInfo fi = null;
- try {
- fi = getFileInfo(ledgerId, null);
- fi.close(false);
- fi.delete();
- } finally {
- // should release use count
- // otherwise the file channel would not be closed.
- if (null != fi) {
- fi.release();
- }
- }
-
- // Remove it from the active ledger manager
- activeLedgers.remove(ledgerId);
-
- // Now remove it from all the other lists and maps.
- // These data structures need to be synchronized first before removing entries.
- synchronized(fileInfoCache) {
- fileInfoCache.remove(ledgerId);
- }
- synchronized(cleanLedgers) {
- cleanLedgers.remove(ledgerId);
- }
- synchronized(dirtyLedgers) {
- dirtyLedgers.remove(ledgerId);
- }
- synchronized(openLedgers) {
- openLedgers.remove(ledgerId);
- }
- }
-
- private File findIndexFile(long ledgerId) throws IOException {
- String ledgerName = getLedgerName(ledgerId);
- for (File d : ledgerDirsManager.getAllLedgerDirs()) {
- File lf = new File(d, ledgerName);
- if (lf.exists()) {
- return lf;
- }
- }
- return null;
+ indexPageManager.removePagesForLedger(ledgerId);
+ indexPersistenceManager.removeLedger(ledgerId);
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
- synchronized(fileInfoCache) {
- FileInfo fi = fileInfoCache.get(ledgerId);
- if (fi == null) {
- File lf = findIndexFile(ledgerId);
- if (lf == null) {
- throw new Bookie.NoLedgerException(ledgerId);
- }
- evictFileInfoIfNecessary();
- fi = new FileInfo(lf, null);
- byte[] key = fi.getMasterKey();
- fileInfoCache.put(ledgerId, fi);
- openLedgers.add(ledgerId);
- return key;
- }
- return fi.getMasterKey();
- }
- }
-
- // evict file info if necessary
- private void evictFileInfoIfNecessary() throws IOException {
- synchronized (fileInfoCache) {
- if (openLedgers.size() > openFileLimit) {
- long ledgerToRemove = openLedgers.removeFirst();
- // TODO Add a statistic here, we don't care really which
- // ledger is evicted, but the rate at which they get evicted
- LOG.debug("Ledger {} is evicted from file info cache.",
- ledgerToRemove);
- fileInfoCache.remove(ledgerToRemove).close(true);
- }
- }
+ return indexPersistenceManager.readMasterKey(ledgerId);
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
- FileInfo fi = null;
- try {
- fi = getFileInfo(ledgerId, null);
- return fi.setFenced();
- } finally {
- if (null != fi) {
- fi.release();
- }
- }
+ return indexPersistenceManager.setFenced(ledgerId);
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
- FileInfo fi = null;
- try {
- fi = getFileInfo(ledgerId, null);
- return fi.isFenced();
- } finally {
- if (null != fi) {
- fi.release();
- }
- }
+ return indexPersistenceManager.isFenced(ledgerId);
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
- FileInfo fi = null;
- try {
- fi = getFileInfo(ledgerId, masterKey);
- } finally {
- if (null != fi) {
- fi.release();
- }
- }
+ indexPersistenceManager.setMasterKey(ledgerId, masterKey);
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
- synchronized(fileInfoCache) {
- FileInfo fi = fileInfoCache.get(ledgerId);
- if (fi == null) {
- File lf = findIndexFile(ledgerId);
- if (lf == null) {
- return false;
- }
- }
- }
- return true;
+ return indexPersistenceManager.ledgerExists(ledgerId);
}
@Override
@@ -876,7 +143,7 @@ public class LedgerCacheImpl implements
@Override
public int getPageCount() {
- return LedgerCacheImpl.this.getNumUsedPages();
+ return LedgerCacheImpl.this.indexPageManager.getNumUsedPages();
}
@Override
@@ -886,41 +153,33 @@ public class LedgerCacheImpl implements
@Override
public int getOpenFileLimit() {
- return openFileLimit;
+ return LedgerCacheImpl.this.indexPersistenceManager.getOpenFileLimit();
}
@Override
public int getPageLimit() {
- return LedgerCacheImpl.this.getPageLimit();
+ return LedgerCacheImpl.this.indexPageManager.getPageLimit();
}
@Override
public int getNumCleanLedgers() {
- return cleanLedgers.size();
+ return LedgerCacheImpl.this.indexPageManager.getNumCleanLedgers();
}
@Override
public int getNumDirtyLedgers() {
- return dirtyLedgers.size();
+ return LedgerCacheImpl.this.indexPageManager.getNumDirtyLedgers();
}
@Override
public int getNumOpenLedgers() {
- return openLedgers.size();
+ return LedgerCacheImpl.this.indexPersistenceManager.getNumOpenLedgers();
}
};
}
@Override
public void close() throws IOException {
- synchronized (fileInfoCache) {
- for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
- FileInfo value = fileInfo.getValue();
- if (value != null) {
- value.close(true);
- }
- }
- fileInfoCache.clear();
- }
+ indexPersistenceManager.close();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Fri Oct 11 06:36:27 2013
@@ -21,35 +21,24 @@ package org.apache.bookkeeper.bookie;
*
*/
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
import java.io.File;
-import java.io.RandomAccessFile;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Random;
-import java.util.Set;
import java.util.Arrays;
+import java.util.Random;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeperTestClient;
-import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
public class BookieJournalTest {
static Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
@@ -59,7 +48,7 @@ public class BookieJournalTest {
private void writeIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey)
throws Exception {
- File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
+ File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file
@@ -70,7 +59,7 @@ public class BookieJournalTest {
private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
byte[] masterKey, boolean truncateToMasterKey)
throws Exception {
- File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
+ File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Fri Oct 11 06:36:27 2013
@@ -25,6 +25,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import junit.framework.TestCase;
+
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -38,12 +40,9 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import junit.framework.TestCase;
-
/**
* LedgerCache related test cases
*/
@@ -266,11 +265,11 @@ public class LedgerCacheTest extends Tes
// Create ledger index file
ledgerStorage.setMasterKey(1, "key".getBytes());
- FileInfo fileInfo = ledgerCache.getFileInfo(Long.valueOf(1), null);
+ FileInfo fileInfo = ledgerCache.getIndexPersistenceManager().getFileInfo(Long.valueOf(1), null);
// Simulate the flush failure
FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
- ledgerCache.fileInfoCache.put(Long.valueOf(1), newFileInfo);
+ ledgerCache.getIndexPersistenceManager().fileInfoCache.put(Long.valueOf(1), newFileInfo);
// Add entries
ledgerStorage.addEntry(generateEntry(1, 1));
ledgerStorage.addEntry(generateEntry(1, 2));
@@ -364,7 +363,7 @@ public class LedgerCacheTest extends Tes
public void testSyncThreadNPE() throws IOException {
newLedgerCache();
try {
- ((LedgerCacheImpl) ledgerCache).getLedgerEntryPage(0L, 0L, true);
+ ((LedgerCacheImpl) ledgerCache).getIndexPageManager().getLedgerEntryPage(0L, 0L, true);
} catch (Exception e) {
LOG.error("Exception when trying to get a ledger entry page", e);
fail("Shouldn't have thrown an exception");
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Fri Oct 11 06:36:27 2013
@@ -21,33 +21,28 @@
package org.apache.bookkeeper.bookie;
-import java.util.Arrays;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.io.BufferedWriter;
import java.io.File;
-import java.io.IOException;
-
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
-import java.io.BufferedWriter;
import java.io.PrintStream;
import java.io.RandomAccessFile;
-
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
-import static org.junit.Assert.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.test.PortManager;
-
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +70,7 @@ public class UpgradeTest {
throws Exception {
long ledgerId = 1;
- File fn = new File(dir, LedgerCacheImpl.getLedgerName(ledgerId));
+ File fn = new File(dir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
// force creation of index file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java?rev=1531203&r1=1531202&r2=1531203&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java Fri Oct 11 06:36:27 2013
@@ -20,41 +20,37 @@
*/
package org.apache.bookkeeper.replication;
-import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.CountDownLatch;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.LinkedList;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieAccessor;
-import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.bookkeeper.bookie.IndexPersistenceMgr;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
-
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.LedgerCacheImpl;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.Before;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,7 +187,7 @@ public class AuditorPeriodicCheckTest ex
ledgerDir = Bookie.getCurrentDirectory(ledgerDir);
// corrupt of entryLogs
- File index = new File(ledgerDir, LedgerCacheImpl.getLedgerName(ledgerToCorrupt));
+ File index = new File(ledgerDir, IndexPersistenceMgr.getLedgerName(ledgerToCorrupt));
LOG.info("file to corrupt{}" , index);
ByteBuffer junk = ByteBuffer.allocate(1024*1024);
FileOutputStream out = new FileOutputStream(index);