You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/26 12:51:44 UTC

svn commit: r1402464 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/

Author: ivank
Date: Fri Oct 26 10:51:44 2012
New Revision: 1402464

URL: http://svn.apache.org/viewvc?rev=1402464&view=rev
Log:
BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should look at next ledger dir(if any) (Vinay via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Oct 26 10:51:44 2012
@@ -166,6 +166,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-345: Detect IOExceptions on entrylogger and bookie should consider next ledger dir(if any) (Vinay via ivank)
 
+        BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should look at next ledger dir(if any) (Vinay via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Fri Oct 26 10:51:44 2012
@@ -54,7 +54,7 @@ class FileInfo {
     static final int NO_MASTER_KEY = -1;
 
     private FileChannel fc;
-    private final File lf;
+    private File lf;
     byte[] masterKey;
 
     /**
@@ -67,6 +67,7 @@ class FileInfo {
     private long size;
     private int useCount;
     private boolean isClosed;
+    private long sizeSinceLastwrite;
 
     // file access mode
     protected String mode;
@@ -78,6 +79,14 @@ class FileInfo {
         mode = "rw";
     }
 
+    public File getLf() {
+        return lf;
+    }
+
+    public long getSizeSinceLastwrite() {
+        return sizeSinceLastwrite;
+    }
+
     synchronized public void readHeader() throws IOException {
         if (lf.exists()) {
             if (fc != null) {
@@ -86,6 +95,7 @@ class FileInfo {
 
             fc = new RandomAccessFile(lf, mode).getChannel();
             size = fc.size();
+            sizeSinceLastwrite = size;
 
             // avoid hang on reading partial index
             ByteBuffer bb = ByteBuffer.allocate((int)(Math.min(size, START_OF_DATA)));
@@ -220,9 +230,61 @@ class FileInfo {
                 size = newsize;
             }
         }
+        sizeSinceLastwrite = fc.size();
         return total;
     }
 
+    /**
+     * Copies current file contents upto specified size to the target file and
+     * deletes the current file. If size not known then pass size as
+     * Long.MAX_VALUE to copy complete file.
+     */
+    public synchronized void moveToNewLocation(File newFile, long size) throws IOException {
+        checkOpen(false);
+        if (size > fc.size()) {
+            size = fc.size();
+        }
+        File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC);
+        if (!rlocFile.exists()) {
+            checkParents(rlocFile);
+            if (!rlocFile.createNewFile()) {
+                throw new IOException("Creating new cache index file " + rlocFile + " failed ");
+            }
+        }
+        // copy contents from old.idx to new.idx.rloc
+        FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
+        try {
+            long written = 0;
+            while (written < size) {
+                long count = fc.transferTo(written, size, newFc);
+                if (count <= 0) {
+                    throw new IOException("Copying to new location " + rlocFile + " failed");
+                }
+                written += count;
+            }
+            if (written <= 0 && size > 0) {
+                throw new IOException("Copying to new location " + rlocFile + " failed");
+            }
+        } finally {
+            newFc.force(true);
+            newFc.close();
+        }
+        // delete old.idx
+        fc.close();
+        if (!delete()) {
+            LOG.error("Failed to delete the previous index file " + lf);
+            throw new IOException("Failed to delete the previous index file " + lf);
+        }
+
+        // rename new.idx.rloc to new.idx
+        if (!rlocFile.renameTo(newFile)) {
+            LOG.error("Failed to rename " + rlocFile + " to " + newFile);
+            throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+        }
+        fc = new RandomAccessFile(newFile, mode).getChannel();
+        lf = newFile;
+    }
+
     synchronized public byte[] getMasterKey() throws IOException {
         checkOpen(false);
         return masterKey;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Fri Oct 26 10:51:44 2012
@@ -54,7 +54,7 @@ class InterleavedLedgerStorage implement
             ActiveLedgerManager activeLedgerManager,
             LedgerDirsManager ledgerDirsManager) throws IOException {
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
+        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
                 activeLedgerManager, new EntryLogCompactionScanner());
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Fri Oct 26 10:51:44 2012
@@ -32,10 +32,12 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +48,15 @@ import org.slf4j.LoggerFactory;
  */
 public class LedgerCacheImpl implements LedgerCache {
     private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
+    private static final String IDX = ".idx";
+    static final String RLOC = ".rloc";
 
-    final File ledgerDirectories[];
+    private LedgerDirsManager ledgerDirsManager;
+    final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
 
-    public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm) {
-        this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
+    public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, LedgerDirsManager ledgerDirsManager)
+            throws IOException {
+        this.ledgerDirsManager = ledgerDirsManager;
         this.openFileLimit = conf.getOpenFileLimit();
         this.pageSize = conf.getPageSize();
         this.entriesPerPage = pageSize / 8;
@@ -66,6 +72,7 @@ public class LedgerCacheImpl implements 
         activeLedgerManager = alm;
         // Retrieve all of the active ledgers.
         getActiveLedgers();
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
     }
     /**
      * the list of potentially clean ledgers
@@ -233,16 +240,10 @@ public class LedgerCacheImpl implements 
         sb.append(Integer.toHexString(parent));
         sb.append('/');
         sb.append(Long.toHexString(ledgerId));
-        sb.append(".idx");
+        sb.append(IDX);
         return sb.toString();
     }
 
-    static final private Random rand = new Random();
-
-    static final private File pickDirs(File dirs[]) {
-        return dirs[rand.nextInt(dirs.length)];
-    }
-
     FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
         synchronized(fileInfoCache) {
             FileInfo fi = fileInfoCache.get(ledger);
@@ -252,9 +253,7 @@ public class LedgerCacheImpl implements 
                     if (masterKey == null) {
                         throw new Bookie.NoLedgerException(ledger);
                     }
-                    File dir = pickDirs(ledgerDirectories);
-                    String ledgerName = getLedgerName(ledger);
-                    lf = new File(dir, ledgerName);
+                    lf = getNewLedgerIndexFile(ledger);
                     // A new ledger index file has been created for this Bookie.
                     // Add this new ledger to the set of active ledgers.
                     LOG.debug("New ledger index file created for ledgerId: {}", ledger);
@@ -262,6 +261,10 @@ public class LedgerCacheImpl implements 
                 }
                 evictFileInfoIfNecessary();
                 fi = new FileInfo(lf, masterKey);
+                if (ledgerDirsManager.isDirFull(lf.getParentFile()
+                        .getParentFile().getParentFile())) {
+                    moveLedgerIndexFile(ledger, fi);
+                }
                 fileInfoCache.put(ledger, fi);
                 openLedgers.add(ledger);
             }
@@ -271,6 +274,13 @@ public class LedgerCacheImpl implements 
             return fi;
         }
     }
+
+    private File getNewLedgerIndexFile(Long ledger) throws NoWritableLedgerDirException {
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        String ledgerName = getLedgerName(ledger);
+        return new File(dir, ledgerName);
+    }
+
     private void updatePage(LedgerEntryPage lep) throws IOException {
         if (!lep.isClean()) {
             throw new IOException("Trying to update a dirty page");
@@ -291,6 +301,32 @@ public class LedgerCacheImpl implements 
         }
     }
 
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                // If the current entry log disk is full, then create new entry
+                // log.
+                shouldRelocateIndexFile.set(true);
+            }
+
+            @Override
+            public void diskFailed(File disk) {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void allDisksFull() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+
+            @Override
+            public void fatalError() {
+                // Nothing to handle here. Will be handled in Bookie
+            }
+        };
+    }
+
     @Override
     public void flushLedger(boolean doAll) throws IOException {
         synchronized(dirtyLedgers) {
@@ -307,6 +343,20 @@ public class LedgerCacheImpl implements 
             if (dirtyLedgers.isEmpty()) {
                 return;
             }
+
+            if (shouldRelocateIndexFile.get()) {
+                // if some new dir detected as full, then move all corresponding
+                // open index files to new location
+                for (Long l : dirtyLedgers) {
+                    FileInfo fi = getFileInfo(l, null);
+                    File currentDir = fi.getLf().getParentFile().getParentFile().getParentFile();
+                    if (ledgerDirsManager.isDirFull(currentDir)) {
+                        moveLedgerIndexFile(l, fi);
+                    }
+                }
+                shouldRelocateIndexFile.set(false);
+            }
+
             while(!dirtyLedgers.isEmpty()) {
                 Long l = dirtyLedgers.removeFirst();
 
@@ -327,6 +377,11 @@ public class LedgerCacheImpl implements 
         }
     }
 
+    private void moveLedgerIndexFile(Long l, FileInfo fi) throws NoWritableLedgerDirException, IOException {
+        File newLedgerIndexFile = getNewLedgerIndexFile(l);
+        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+    }
+
     /**
      * Flush a specified ledger
      *
@@ -578,22 +633,39 @@ public class LedgerCacheImpl implements 
      * BookieServer knows about that have not yet been deleted by the BookKeeper
      * Client. This is called only once during initialization.
      */
-    private void getActiveLedgers() {
+    private void getActiveLedgers() throws IOException {
         // Ledger index files are stored in a file hierarchy with a parent and
         // grandParent directory. We'll have to go two levels deep into these
         // directories to find the index files.
-        for (File ledgerDirectory : ledgerDirectories) {
+        for (File ledgerDirectory : ledgerDirsManager.getAllLedgerDirs()) {
             for (File grandParent : ledgerDirectory.listFiles()) {
                 if (grandParent.isDirectory()) {
                     for (File parent : grandParent.listFiles()) {
                         if (parent.isDirectory()) {
                             for (File index : parent.listFiles()) {
-                                if (!index.isFile() || !index.getName().endsWith(".idx")) {
+                                if (!index.isFile()
+                                        || (!index.getName().endsWith(IDX) && !index.getName().endsWith(RLOC))) {
                                     continue;
                                 }
-                                // We've found a ledger index file. The file name is the
-                                // HexString representation of the ledgerId.
-                                String ledgerIdInHex = index.getName().substring(0, index.getName().length() - 4);
+
+                                // We've found a ledger index file. The file
+                                // name is the HexString representation of the
+                                // ledgerId.
+                                String ledgerIdInHex = index.getName().replace(RLOC, "").replace(IDX, "");
+                                if (index.getName().endsWith(RLOC)) {
+                                    if (findIndexFile(Long.parseLong(ledgerIdInHex)) != null) {
+                                        if (!index.delete()) {
+                                            LOG.warn("Deleting the rloc file " + index + " failed");
+                                        }
+                                        continue;
+                                    } else {
+                                        File dest = new File(index.getParentFile(), ledgerIdInHex + IDX);
+                                        if (!index.renameTo(dest)) {
+                                            throw new IOException("Renaming rloc file " + index
+                                                    + " to index file has failed");
+                                        }
+                                    }
+                                }
                                 activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
                             }
                         }
@@ -656,7 +728,7 @@ public class LedgerCacheImpl implements 
 
     private File findIndexFile(long ledgerId) throws IOException {
         String ledgerName = getLedgerName(ledgerId);
-        for(File d: ledgerDirectories) {
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File lf = new File(d, ledgerName);
             if (lf.exists()) {
                 return lf;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1402464&r1=1402463&r2=1402464&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Fri Oct 26 10:51:44 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -30,6 +31,7 @@ import org.apache.bookkeeper.meta.Active
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,6 +52,8 @@ public class LedgerCacheTest extends Tes
     ServerConfiguration conf;
     File txnDir, ledgerDir;
 
+    private Bookie bookie;
+
     @Override
     @Before
     public void setUp() throws Exception {
@@ -66,27 +70,34 @@ public class LedgerCacheTest extends Tes
         conf.setZkServers(null);
         conf.setJournalDirName(txnDir.getPath());
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+        bookie = new Bookie(conf);
 
         ledgerManagerFactory =
             LedgerManagerFactory.newLedgerManagerFactory(conf, null);
         activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
     }
 
     @Override
     @After
     public void tearDown() throws Exception {
+        bookie.ledgerStorage.shutdown();
         activeLedgerManager.close();
         ledgerManagerFactory.uninitialize();
         FileUtils.deleteDirectory(txnDir);
         FileUtils.deleteDirectory(ledgerDir);
     }
 
-    private void newLedgerCache() {
-        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
+    private void newLedgerCache() throws IOException {
+        if (ledgerCache != null) {
+            ledgerCache.close();
+        }
+        ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new LedgerCacheImpl(
+                conf, activeLedgerManager, bookie.getLedgerDirsManager());
     }
 
     @Test
-    public void testAddEntryException() {
+    public void testAddEntryException() throws IOException {
         // set page limitation
         conf.setPageLimit(10);
         // create a ledger cache
@@ -211,4 +222,57 @@ public class LedgerCacheTest extends Tes
             fail("Failed to add entry.");
         }
     }
+
+    /**
+     * Test Ledger Cache flush failure
+     */
+    public void testLedgerCacheFlushFailureOnDiskFull() throws Exception {
+        File ledgerDir1 = File.createTempFile("bkTest", ".dir");
+        ledgerDir1.delete();
+        File ledgerDir2 = File.createTempFile("bkTest", ".dir");
+        ledgerDir2.delete();
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(), ledgerDir2.getAbsolutePath() });
+
+        Bookie bookie = new Bookie(conf);
+        InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
+        LedgerCacheImpl ledgerCache = (LedgerCacheImpl) ledgerStorage.ledgerCache;
+        // Create ledger index file
+        ledgerStorage.setMasterKey(1, "key".getBytes());
+
+        FileInfo fileInfo = ledgerCache.getFileInfo(Long.valueOf(1), null);
+
+        // Simulate the flush failure
+        FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
+        ledgerCache.fileInfoCache.put(Long.valueOf(1), newFileInfo);
+        // Add entries
+        ledgerStorage.addEntry(generateEntry(1, 1));
+        ledgerStorage.addEntry(generateEntry(1, 2));
+        ledgerStorage.flush();
+
+        ledgerStorage.addEntry(generateEntry(1, 3));
+        // add the dir to failed dirs
+        bookie.getLedgerDirsManager().addToFilledDirs(
+                newFileInfo.getLf().getParentFile().getParentFile().getParentFile());
+        File before = newFileInfo.getLf();
+        // flush after disk is added as failed.
+        ledgerStorage.flush();
+        File after = newFileInfo.getLf();
+
+        assertFalse("After flush index file should be changed", before.equals(after));
+        // Verify written entries
+        Assert.assertArrayEquals(generateEntry(1, 1).array(), ledgerStorage.getEntry(1, 1).array());
+        Assert.assertArrayEquals(generateEntry(1, 2).array(), ledgerStorage.getEntry(1, 2).array());
+        Assert.assertArrayEquals(generateEntry(1, 3).array(), ledgerStorage.getEntry(1, 3).array());
+    }
+
+    private ByteBuffer generateEntry(long ledger, long entry) {
+        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+        ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
+        bb.putLong(ledger);
+        bb.putLong(entry);
+        bb.put(data);
+        bb.flip();
+        return bb;
+    }
 }