You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 12:51:44 UTC
svn commit: r1402464 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
Author: ivank
Date: Fri Oct 26 10:51:44 2012
New Revision: 1402464
URL: http://svn.apache.org/viewvc?rev=1402464&view=rev
Log:
BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should look at next ledger dir(if any) (Vinay via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 10:51:44 2012
@@ -166,6 +166,8 @@ Trunk (unreleased changes)
BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)
+ BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should look at next ledger dir(if any) (Vinay via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Fri Oct 26 10:51:44 2012
@@ -54,7 +54,7 @@ class FileInfo {
static final int NO_MASTER_KEY = -1;
private FileChannel fc;
- private final File lf;
+ private File lf;
byte[] masterKey;
/**
@@ -67,6 +67,7 @@ class FileInfo {
private long size;
private int useCount;
private boolean isClosed;
+ private long sizeSinceLastwrite;
// file access mode
protected String mode;
@@ -78,6 +79,14 @@ class FileInfo {
mode = "rw";
}
+ public File getLf() {
+ return lf;
+ }
+
+ public long getSizeSinceLastwrite() {
+ return sizeSinceLastwrite;
+ }
+
synchronized public void readHeader() throws IOException {
if (lf.exists()) {
if (fc != null) {
@@ -86,6 +95,7 @@ class FileInfo {
fc = new RandomAccessFile(lf, mode).getChannel();
size = fc.size();
+ sizeSinceLastwrite = size;
// avoid hang on reading partial index
ByteBuffer bb = ByteBuffer.allocate((int)(Math.min(size, START_OF_DATA)));
@@ -220,9 +230,61 @@ class FileInfo {
size = newsize;
}
}
+ sizeSinceLastwrite = fc.size();
return total;
}
+ /**
+ * Copies current file contents upto specified size to the target file and
+ * deletes the current file. If size not known then pass size as
+ * Long.MAX_VALUE to copy complete file.
+ */
+ public synchronized void moveToNewLocation(File newFile, long size) throws IOException {
+ checkOpen(false);
+ if (size > fc.size()) {
+ size = fc.size();
+ }
+ File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+ if (!rlocFile.exists()) {
+ checkParents(rlocFile);
+ if (!rlocFile.createNewFile()) {
+ throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+ }
+ }
+ // copy contents from old.idx to new.idx.rloc
+ FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
+ try {
+ long written = 0;
+ while (written < size) {
+ long count = fc.transferTo(written, size, newFc);
+ if (count <= 0) {
+ throw new IOException("Copying to new location " + rlocFile + " failed");
+ }
+ written += count;
+ }
+ if (written <= 0 && size > 0) {
+ throw new IOException("Copying to new location " + rlocFile + " failed");
+ }
+ } finally {
+ newFc.force(true);
+ newFc.close();
+ }
+ // delete old.idx
+ fc.close();
+ if (!delete()) {
+ LOG.error("Failed to delete the previous index file " + lf);
+ throw new IOException("Failed to delete the previous index file " + lf);
+ }
+
+ // rename new.idx.rloc to new.idx
+ if (!rlocFile.renameTo(newFile)) {
+ LOG.error("Failed to rename " + rlocFile + " to " + newFile);
+ throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+ }
+ fc = new RandomAccessFile(newFile, mode).getChannel();
+ lf = newFile;
+ }
+
synchronized public byte[] getMasterKey() throws IOException {
checkOpen(false);
return masterKey;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Fri Oct 26 10:51:44 2012
@@ -54,7 +54,7 @@ class InterleavedLedgerStorage implement
ActiveLedgerManager activeLedgerManager,
LedgerDirsManager ledgerDirsManager) throws IOException {
entryLogger = new EntryLogger(conf, ledgerDirsManager);
- ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
+ ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
activeLedgerManager, new EntryLogCompactionScanner());
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Fri Oct 26 10:51:44 2012
@@ -32,10 +32,12 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +48,15 @@ import org.slf4j.LoggerFactory;
*/
public class LedgerCacheImpl implements LedgerCache {
private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
+ private static final String IDX = ".idx";
+ static final String RLOC = ".rloc";
- final File ledgerDirectories[];
+ private LedgerDirsManager ledgerDirsManager;
+ final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
- public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm) {
- this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+ public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, LedgerDirsManager ledgerDirsManager)
+ throws IOException {
+ this.ledgerDirsManager = ledgerDirsManager;
this.openFileLimit = conf.getOpenFileLimit();
this.pageSize = conf.getPageSize();
this.entriesPerPage = pageSize / 8;
@@ -66,6 +72,7 @@ public class LedgerCacheImpl implements
activeLedgerManager = alm;
// Retrieve all of the active ledgers.
getActiveLedgers();
+ ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
}
/**
* the list of potentially clean ledgers
@@ -233,16 +240,10 @@ public class LedgerCacheImpl implements
sb.append(Integer.toHexString(parent));
sb.append('/');
sb.append(Long.toHexString(ledgerId));
- sb.append(".idx");
+ sb.append(IDX);
return sb.toString();
}
- static final private Random rand = new Random();
-
- static final private File pickDirs(File dirs[]) {
- return dirs[rand.nextInt(dirs.length)];
- }
-
FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
synchronized(fileInfoCache) {
FileInfo fi = fileInfoCache.get(ledger);
@@ -252,9 +253,7 @@ public class LedgerCacheImpl implements
if (masterKey == null) {
throw new Bookie.NoLedgerException(ledger);
}
- File dir = pickDirs(ledgerDirectories);
- String ledgerName = getLedgerName(ledger);
- lf = new File(dir, ledgerName);
+ lf = getNewLedgerIndexFile(ledger);
// A new ledger index file has been created for this Bookie.
// Add this new ledger to the set of active ledgers.
LOG.debug("New ledger index file created for ledgerId: {}", ledger);
@@ -262,6 +261,10 @@ public class LedgerCacheImpl implements
}
evictFileInfoIfNecessary();
fi = new FileInfo(lf, masterKey);
+ if (ledgerDirsManager.isDirFull(lf.getParentFile()
+ .getParentFile().getParentFile())) {
+ moveLedgerIndexFile(ledger, fi);
+ }
fileInfoCache.put(ledger, fi);
openLedgers.add(ledger);
}
@@ -271,6 +274,13 @@ public class LedgerCacheImpl implements
return fi;
}
}
+
+ private File getNewLedgerIndexFile(Long ledger) throws NoWritableLedgerDirException {
+ File dir = ledgerDirsManager.pickRandomWritableDir();
+ String ledgerName = getLedgerName(ledger);
+ return new File(dir, ledgerName);
+ }
+
private void updatePage(LedgerEntryPage lep) throws IOException {
if (!lep.isClean()) {
throw new IOException("Trying to update a dirty page");
@@ -291,6 +301,32 @@ public class LedgerCacheImpl implements
}
}
+ private LedgerDirsListener getLedgerDirsListener() {
+ return new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ // If the current entry log disk is full, then create new entry
+ // log.
+ shouldRelocateIndexFile.set(true);
+ }
+
+ @Override
+ public void diskFailed(File disk) {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void allDisksFull() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+
+ @Override
+ public void fatalError() {
+ // Nothing to handle here. Will be handled in Bookie
+ }
+ };
+ }
+
@Override
public void flushLedger(boolean doAll) throws IOException {
synchronized(dirtyLedgers) {
@@ -307,6 +343,20 @@ public class LedgerCacheImpl implements
if (dirtyLedgers.isEmpty()) {
return;
}
+
+ if (shouldRelocateIndexFile.get()) {
+ // if some new dir detected as full, then move all corresponding
+ // open index files to new location
+ for (Long l : dirtyLedgers) {
+ FileInfo fi = getFileInfo(l, null);
+ File currentDir = fi.getLf().getParentFile().getParentFile().getParentFile();
+ if (ledgerDirsManager.isDirFull(currentDir)) {
+ moveLedgerIndexFile(l, fi);
+ }
+ }
+ shouldRelocateIndexFile.set(false);
+ }
+
while(!dirtyLedgers.isEmpty()) {
Long l = dirtyLedgers.removeFirst();
@@ -327,6 +377,11 @@ public class LedgerCacheImpl implements
}
}
+ private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
+ File newLedgerIndexFile = getNewLedgerIndexFile(l);
+ fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+ }
+
/**
* Flush a specified ledger
*
@@ -578,22 +633,39 @@ public class LedgerCacheImpl implements
* BookieServer knows about that have not yet been deleted by the BookKeeper
* Client. This is called only once during initialization.
*/
- private void getActiveLedgers() {
+ private void getActiveLedgers() throws IOException {
// Ledger index files are stored in a file hierarchy with a parent and
// grandParent directory. We'll have to go two levels deep into these
// directories to find the index files.
- for (File ledgerDirectory : ledgerDirectories) {
+ for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
for (File grandParent : ledgerDirectory.listFiles()) {
if (grandParent.isDirectory()) {
for (File parent : grandParent.listFiles()) {
if (parent.isDirectory()) {
for (File index : parent.listFiles()) {
- if (!index.isFile() || !index.getName().endsWith(".idx")) {
+ if (!index.isFile()
+ || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
continue;
}
- // We've found a ledger index file. The file name is the
- // HexString representation of the ledgerId.
- String ledgerIdInHex = index.getName().substring(0, index.getName().length() - 4);
+
+ // We've found a ledger index file. The file
+ // name is the HexString representation of the
+ // ledgerId.
+ String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
+ if (index.getName().endsWith(RLOC)) {
+ if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
+ if (!index.delete()) {
+ LOG.warn("Deleting the rloc file " + index + " failed");
+ }
+ continue;
+ } else {
+ File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
+ if (!index.renameTo(dest)) {
+ throw new IOException("Renaming rloc file " + index
+ + " to index file has failed");
+ }
+ }
+ }
activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
}
}
@@ -656,7 +728,7 @@ public class LedgerCacheImpl implements
private File findIndexFile(long ledgerId) throws IOException {
String ledgerName = getLedgerName(ledgerId);
- for(File d: ledgerDirectories) {
+ for (File d : ledgerDirsManager.getAllLedgerDirs()) {
File lf = new File(d, ledgerName);
if (lf.exists()) {
return lf;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Fri Oct 26 10:51:44 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -30,6 +31,7 @@ import org.apache.bookkeeper.meta.Active
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.commons.io.FileUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +52,8 @@ public class LedgerCacheTest extends Tes
ServerConfiguration conf;
File txnDir, ledgerDir;
+ private Bookie bookie;
+
@Override
@Before
public void setUp() throws Exception {
@@ -66,27 +70,34 @@ public class LedgerCacheTest extends Tes
conf.setZkServers(null);
conf.setJournalDirName(txnDir.getPath());
conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+ bookie = new Bookie(conf);
ledgerManagerFactory =
LedgerManagerFactory.newLedgerManagerFactory(conf, null);
activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
+ ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
}
@Override
@After
public void tearDown() throws Exception {
+ bookie.ledgerStorage.shutdown();
activeLedgerManager.close();
ledgerManagerFactory.uninitialize();
FileUtils.deleteDirectory(txnDir);
FileUtils.deleteDirectory(ledgerDir);
}
- private void newLedgerCache() {
- ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
+ private void newLedgerCache() throws IOException {
+ if (ledgerCache != null) {
+ ledgerCache.close();
+ }
+ ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl(
+ conf, activeLedgerManager, bookie.getLedgerDirsManager());
}
@Test
- public void testAddEntryException() {
+ public void testAddEntryException() throws IOException {
// set page limitation
conf.setPageLimit(10);
// create a ledger cache
@@ -211,4 +222,57 @@ public class LedgerCacheTest extends Tes
fail("Failed to add entry.");
}
}
+
+ /**
+ * Test Ledger Cache flush failure
+ */
+ public void testLedgerCacheFlushFailureOnDiskFull() throws Exception {
+ File ledgerDir1 = File.createTempFile("bkTest", ".dir");
+ ledgerDir1.delete();
+ File ledgerDir2 = File.createTempFile("bkTest", ".dir");
+ ledgerDir2.delete();
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
+
+ Bookie bookie = new Bookie(conf);
+ InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+ LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache;
+ // Create ledger index file
+ ledgerStorage.setMasterKey(1, "key".getBytes());
+
+ FileInfo fileInfo = ledgerCache.getFileInfo(Long.valueOf(1), null);
+
+ // Simulate the flush failure
+ FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
+ ledgerCache.fileInfoCache.put(Long.valueOf(1), newFileInfo);
+ // Add entries
+ ledgerStorage.addEntry(generateEntry(1, 1));
+ ledgerStorage.addEntry(generateEntry(1, 2));
+ ledgerStorage.flush();
+
+ ledgerStorage.addEntry(generateEntry(1, 3));
+ // add the dir to failed dirs
+ bookie.getLedgerDirsManager().addToFilledDirs(
+ newFileInfo.getLf().getParentFile().getParentFile().getParentFile());
+ File before = newFileInfo.getLf();
+ // flush after disk is added as failed.
+ ledgerStorage.flush();
+ File after = newFileInfo.getLf();
+
+ assertFalse("After flush index file should be changed", before.equals(after));
+ // Verify written entries
+ Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage.getEntry(1, 1).array());
+ Assert.assertArrayEquals(generateEntry(1, 2).array(), ledgerStorage.getEntry(1, 2).array());
+ Assert.assertArrayEquals(generateEntry(1, 3).array(), ledgerStorage.getEntry(1, 3).array());
+ }
+
+ private ByteBuffer generateEntry(long ledger, long entry) {
+ byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+ ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
+ bb.putLong(ledger);
+ bb.putLong(entry);
+ bb.put(data);
+ bb.flip();
+ return bb;
+ }
}