You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/02/24 19:08:01 UTC

svn commit: r1293369 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: ivank
Date: Fri Feb 24 18:08:01 2012
New Revision: 1293369

URL: http://svn.apache.org/viewvc?rev=1293369&view=rev
Log:
BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1293369&r1=1293368&r2=1293369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Feb 24 18:08:01 2012
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-174: Bookie can't start when replaying entries whose ledger were deleted and garbage collected. (sijie via ivank)
 
+        BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1293369&r1=1293368&r2=1293369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Fri Feb 24 18:08:01 2012
@@ -197,42 +197,53 @@ public class Bookie extends Thread {
                 }
 
                 lastLogMark.markLog();
+
+                boolean flushFailed = false;
                 try {
                     ledgerCache.flushLedger(true);
                 } catch (IOException e) {
                     LOG.error("Exception flushing Ledger", e);
+                    flushFailed = true;
                 }
                 try {
                     entryLogger.flush();
                 } catch (IOException e) {
                     LOG.error("Exception flushing entry logger", e);
+                    flushFailed = true;
                 }
-                lastLogMark.rollLog();
 
-                // list the journals that have been marked
-                List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
-                    @Override
-                    public boolean accept(long journalId) {
-                        if (journalId < lastLogMark.lastMark.txnLogId) {
-                            return true;
-                        } else {
-                            return false;
+                // if flush failed, we should not roll last mark, otherwise we would
+                // have some ledgers are not flushed and their journal entries were lost
+                if (!flushFailed) {
+
+                    lastLogMark.rollLog();
+
+                    // list the journals that have been marked
+                    List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
+                        @Override
+                        public boolean accept(long journalId) {
+                            if (journalId < lastLogMark.lastMark.txnLogId) {
+                                return true;
+                            } else {
+                                return false;
+                            }
                         }
-                    }
-                });
+                    });
 
-                // keep MAX_BACKUP_JOURNALS journal files before marked journal
-                if (logs.size() >= maxBackupJournals) {
-                    int maxIdx = logs.size() - maxBackupJournals;
-                    for (int i=0; i<maxIdx; i++) {
-                        long id = logs.get(i);
-                        // make sure the journal id is smaller than marked journal id
-                        if (id < lastLogMark.lastMark.txnLogId) {
-                            File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
-                            journalFile.delete();
-                            LOG.info("garbage collected journal " + journalFile.getName());
+                    // keep MAX_BACKUP_JOURNALS journal files before marked journal
+                    if (logs.size() >= maxBackupJournals) {
+                        int maxIdx = logs.size() - maxBackupJournals;
+                        for (int i=0; i<maxIdx; i++) {
+                            long id = logs.get(i);
+                            // make sure the journal id is smaller than marked journal id
+                            if (id < lastLogMark.lastMark.txnLogId) {
+                                File journalFile = new File(journalDirectory, Long.toHexString(id) + ".txn");
+                                journalFile.delete();
+                                LOG.info("garbage collected journal " + journalFile.getName());
+                            }
                         }
                     }
+
                 }
 
                 // clear flushing flag

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1293369&r1=1293368&r2=1293369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Fri Feb 24 18:08:01 2012
@@ -183,11 +183,12 @@ public class LedgerCache {
         try {
             if (lep == null) {
                 lep = grabCleanPage(ledger, pageEntry);
+                // should update page before we put it into table
+                // otherwise we would put an empty page in it
+                updatePage(lep);
                 synchronized(this) {
                     putIntoTable(pages, lep);
                 }
-                updatePage(lep);
-
             }
             return lep.getOffset(offsetInPage*8);
         } finally {
@@ -303,75 +304,9 @@ public class LedgerCache {
             }
             while(!dirtyLedgers.isEmpty()) {
                 Long l = dirtyLedgers.removeFirst();
-                LinkedList<Long> firstEntryList;
-                synchronized(this) {
-                    HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
-                    if (pageMap == null || pageMap.isEmpty()) {
-                        continue;
-                    }
-                    firstEntryList = new LinkedList<Long>();
-                    for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
-                        LedgerEntryPage lep = entry.getValue();
-                        if (lep.isClean()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Page is clean " + lep);
-                            }
-                            continue;
-                        }
-                        firstEntryList.add(lep.getFirstEntry());
-                    }
-                }
-                // Now flush all the pages of a ledger
-                List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
-                FileInfo fi = null;
-                try {
-                    for(Long firstEntry: firstEntryList) {
-                        LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
-                        if (lep != null) {
-                            entries.add(lep);
-                        }
-                    }
-                    Collections.sort(entries, new Comparator<LedgerEntryPage>() {
-                        @Override
-                        public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
-                            return (int)(o1.getFirstEntry()-o2.getFirstEntry());
-                        }
-                    });
-                    ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
-                    fi = getFileInfo(l, null);
-                    int start = 0;
-                    long lastOffset = -1;
-                    for(int i = 0; i < entries.size(); i++) {
-                        versions.add(i, entries.get(i).getVersion());
-                        if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
-                            // send up a sequential list
-                            int count = i - start;
-                            if (count == 0) {
-                                System.out.println("Count cannot possibly be zero!");
-                            }
-                            writeBuffers(l, entries, fi, start, count);
-                            start = i;
-                        }
-                        lastOffset = entries.get(i).getFirstEntry();
-                    }
-                    if (entries.size()-start == 0 && entries.size() != 0) {
-                        System.out.println("Nothing to write, but there were entries!");
-                    }
-                    writeBuffers(l, entries, fi, start, entries.size()-start);
-                    synchronized(this) {
-                        for(int i = 0; i < entries.size(); i++) {
-                            LedgerEntryPage lep = entries.get(i);
-                            lep.setClean(versions.get(i));
-                        }
-                    }
-                } finally {
-                    for(LedgerEntryPage lep: entries) {
-                        lep.releasePage();
-                    }
-                    if (fi != null) {
-                        fi.release();
-                    }
-                }
+
+                flushLedger(l);
+
                 if (!doAll) {
                     break;
                 }
@@ -387,6 +322,92 @@ public class LedgerCache {
         }
     }
 
+    /**
+     * Flush a specified ledger
+     *
+     * @param l
+     *          Ledger Id
+     * @throws IOException
+     */
+    private void flushLedger(long l) throws IOException {
+        LinkedList<Long> firstEntryList;
+        synchronized(this) {
+            HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
+            if (pageMap == null || pageMap.isEmpty()) {
+                return;
+            }
+            firstEntryList = new LinkedList<Long>();
+            for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+                LedgerEntryPage lep = entry.getValue();
+                if (lep.isClean()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Page is clean " + lep);
+                    }
+                    continue;
+                }
+                firstEntryList.add(lep.getFirstEntry());
+            }
+        }
+
+        if (firstEntryList.size() == 0) {
+            LOG.debug("Nothing to flush for ledger {}.", l);
+            // nothing to do
+            return;
+        }
+
+        // Now flush all the pages of a ledger
+        List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
+        FileInfo fi = null;
+        try {
+            for(Long firstEntry: firstEntryList) {
+                LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+                if (lep != null) {
+                    entries.add(lep);
+                }
+            }
+            Collections.sort(entries, new Comparator<LedgerEntryPage>() {
+                    @Override
+                    public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
+                    return (int)(o1.getFirstEntry()-o2.getFirstEntry());
+                    }
+                    });
+            ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+            fi = getFileInfo(l, null);
+            int start = 0;
+            long lastOffset = -1;
+            for(int i = 0; i < entries.size(); i++) {
+                versions.add(i, entries.get(i).getVersion());
+                if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
+                    // send up a sequential list
+                    int count = i - start;
+                    if (count == 0) {
+                        System.out.println("Count cannot possibly be zero!");
+                    }
+                    writeBuffers(l, entries, fi, start, count);
+                    start = i;
+                }
+                lastOffset = entries.get(i).getFirstEntry();
+            }
+            if (entries.size()-start == 0 && entries.size() != 0) {
+                System.out.println("Nothing to write, but there were entries!");
+            }
+            writeBuffers(l, entries, fi, start, entries.size()-start);
+            synchronized(this) {
+                for(int i = 0; i < entries.size(); i++) {
+                    LedgerEntryPage lep = entries.get(i);
+                    lep.setClean(versions.get(i));
+                }
+            }
+        } finally {
+            for(LedgerEntryPage lep: entries) {
+                lep.releasePage();
+            }
+            if (fi != null) {
+                fi.release();
+            }
+        }
+    }
+
     private void writeBuffers(Long ledger,
                               List<LedgerEntryPage> entries, FileInfo fi,
                               int start, int count) throws IOException {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1293369&r1=1293368&r2=1293369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Fri Feb 24 18:08:01 2012
@@ -141,6 +141,12 @@ public class LedgerDescriptor {
             try {
                 fi = ledgerCache.getFileInfo(ledgerId, null);
                 long size = fi.size();
+                // make sure the file size is aligned with index entry size
+                // otherwise we may read incorret data
+                if (0 != size % 8) {
+                    LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
+                    size = size - size % 8;
+                }
                 // we may not have the last entry in the cache
                 if (size > lastEntry*8) {
                     ByteBuffer bb = ByteBuffer.allocate(ledgerCache.getPageSize());

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1293369&r1=1293368&r2=1293369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Fri Feb 24 18:08:01 2012
@@ -168,6 +168,20 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Set page size
+     *
+     * @see #getPageSize()
+     *
+     * @param pageSize
+     *          Page Size
+     * @return Server Configuration
+     */
+    public ServerConfiguration setPageSize(int pageSize) {
+        this.setProperty(PAGE_SIZE, pageSize);
+        return this;
+    }
+
+    /**
      * Max journal file size
      *
      * @return max journal file size

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java?rev=1293369&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/IndexCorruptionTest.java Fri Feb 24 18:08:01 2012
@@ -0,0 +1,166 @@
+package org.apache.bookkeeper.test;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests that index corruption cases
+ */
+public class IndexCorruptionTest extends BaseTestCase {
+    static Logger LOG = LoggerFactory.getLogger(IndexCorruptionTest.class);
+
+    DigestType digestType;
+
+    int pageSize = 1024;
+
+    public IndexCorruptionTest(DigestType digestType) {
+        super(1);
+        this.digestType = digestType;
+        baseConf.setPageSize(pageSize);
+    }
+
+    private Thread findSyncThread() {
+        int threadCount = Thread.activeCount();
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (Thread t : allthreads) {
+            if (t.getName().equals("SyncThread")) {
+                return t;
+            }
+        }
+        return null;
+    }
+
+    @Test
+    public void testNoSuchLedger() throws Exception {
+        LOG.debug("Testing NoSuchLedger");
+
+        Thread syncThread = findSyncThread();
+        assertNotNull("Not found SyncThread.", syncThread);
+
+        syncThread.suspend();
+        // Create a ledger
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, "".getBytes());
+
+        // Close the ledger which cause a readEntry(0) call
+        LedgerHandle newLh = bkc.openLedger(lh.getId(), digestType, "".getBytes());
+
+        // Create a new ledger to write entries
+        String dummyMsg = "NoSuchLedger";
+        int numMsgs = 3;
+        LedgerHandle wlh = bkc.createLedger(1, 1, digestType, "".getBytes());
+        for (int i=0; i<numMsgs; i++) {
+            wlh.addEntry(dummyMsg.getBytes());
+        }
+
+        syncThread.resume();
+
+        // trigger sync 
+        Thread.sleep(2 * baseConf.getFlushInterval());
+
+        // restart bookies
+        restartBookies();
+
+        Enumeration<LedgerEntry> seq = wlh.readEntries(0, numMsgs - 1); 
+        assertTrue("Enumeration of ledger entries has no element", seq.hasMoreElements() == true);
+        int entryId = 0;
+        while (seq.hasMoreElements()) {
+            LedgerEntry e = seq.nextElement();
+            assertEquals(entryId, e.getEntryId());
+
+            Assert.assertArrayEquals(dummyMsg.getBytes(), e.getEntry());
+            ++entryId;
+        }
+        assertEquals(entryId, numMsgs);
+    }
+
+    @Test
+    public void testEmptyIndexPage() throws Exception {
+        LOG.debug("Testing EmptyIndexPage");
+
+        Thread syncThread = findSyncThread();
+        assertNotNull("Not found SyncThread.", syncThread);
+
+        syncThread.suspend();
+
+        // Create a ledger
+        LedgerHandle lh1 = bkc.createLedger(1, 1, digestType, "".getBytes());
+
+        String dummyMsg = "NoSuchLedger";
+
+        // write two page entries to ledger 2
+        int numMsgs = 2 * pageSize / 8;
+        LedgerHandle lh2 = bkc.createLedger(1, 1, digestType, "".getBytes());
+        for (int i=0; i<numMsgs; i++) {
+            lh2.addEntry(dummyMsg.getBytes());
+        }
+
+        syncThread.resume();
+
+        // trigger sync
+        Thread.sleep(2 * baseConf.getFlushInterval());
+
+        syncThread.suspend();
+
+        // Close ledger 1 which cause a readEntry(0) call
+        LedgerHandle newLh1 = bkc.openLedger(lh1.getId(), digestType, "".getBytes());
+
+        // write another 3 entries to ledger 2
+        for (int i=0; i<3; i++) {
+            lh2.addEntry(dummyMsg.getBytes());
+        }
+
+        syncThread.resume();
+
+        // wait for sync again
+        Thread.sleep(2 * baseConf.getFlushInterval());
+    
+        // restart bookies
+        restartBookies();
+
+        numMsgs += 3;
+        Enumeration<LedgerEntry> seq = lh2.readEntries(0, numMsgs - 1); 
+        assertTrue("Enumeration of ledger entries has no element", seq.hasMoreElements() == true);
+        int entryId = 0;
+        while (seq.hasMoreElements()) {
+            LedgerEntry e = seq.nextElement();
+            assertEquals(entryId, e.getEntryId());
+
+            Assert.assertArrayEquals(dummyMsg.getBytes(), e.getEntry());
+            ++entryId;
+        }
+        assertEquals(entryId, numMsgs);
+    }
+}