You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/29 15:18:34 UTC

svn commit: r1306839 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper...

Author: ivank
Date: Thu Mar 29 13:18:34 2012
New Revision: 1306839

URL: http://svn.apache.org/viewvc?rev=1306839&view=rev
Log:
BOOKKEEPER-190: Add entries would fail when number of open ledgers reaches more than openFileLimit. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCacheTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar 29 13:18:34 2012
@@ -64,6 +64,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-195: HierarchicalLedgerManager doesn't consider idgen as a "specialNode" (ivank)
 
+        BOOKKEEPER-190: Add entries would fail when number of open ledgers reaches more than openFileLimit. (sijie via ivank)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Mar 29 13:18:34 2012
@@ -113,6 +113,7 @@ public class Bookie extends Thread {
         private static final long serialVersionUID = 1L;
         private long ledgerId;
         public NoLedgerException(long ledgerId) {
+            super("Ledger " + ledgerId + " not found");
             this.ledgerId = ledgerId;
         }
         public long getLedgerId() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Thu Mar 29 13:18:34 2012
@@ -182,8 +182,16 @@ class FileInfo {
         return total;
     }
 
-    synchronized public void close() throws IOException {
+    /**
+     * Close a file info
+     *
+     * @param force
+     *          if set to true, the index is forced to create before closed,
+     *          if set to false, the index is not forced to create.
+     */
+    synchronized public void close(boolean force) throws IOException {
         isClosed = true;
+        checkOpen(force);
         if (useCount == 0 && fc != null) {
             fc.close();
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Thu Mar 29 13:18:34 2012
@@ -238,9 +238,7 @@ public class LedgerCacheImpl implements 
                     }
                     activeLedgerManager.addActiveLedger(ledger, true);
                 }
-                if (openLedgers.size() > openFileLimit) {
-                    fileInfoCache.remove(openLedgers.removeFirst()).close();
-                }
+                evictFileInfoIfNecessary();
                 fi = new FileInfo(lf, masterKey);
                 fileInfoCache.put(ledger, fi);
                 openLedgers.add(ledger);
@@ -366,7 +364,7 @@ public class LedgerCacheImpl implements 
                     // send up a sequential list
                     int count = i - start;
                     if (count == 0) {
-                        System.out.println("Count cannot possibly be zero!");
+                        LOG.warn("Count cannot possibly be zero!");
                     }
                     writeBuffers(l, entries, fi, start, count);
                     start = i;
@@ -374,7 +372,7 @@ public class LedgerCacheImpl implements 
                 lastOffset = entries.get(i).getFirstEntry();
             }
             if (entries.size()-start == 0 && entries.size() != 0) {
-                System.out.println("Nothing to write, but there were entries!");
+                LOG.warn("Nothing to write, but there were entries!");
             }
             writeBuffers(l, entries, fi, start, entries.size()-start);
             synchronized(this) {
@@ -400,7 +398,6 @@ public class LedgerCacheImpl implements 
             LOG.trace("Writing " + count + " buffers of " + Long.toHexString(ledger));
         }
         if (count == 0) {
-            //System.out.println("Count is zero!");
             return;
         }
         ByteBuffer buffs[] = new ByteBuffer[count];
@@ -417,7 +414,6 @@ public class LedgerCacheImpl implements 
             if (rc <= 0) {
                 throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
             }
-            //System.out.println("Wrote " + rc + " to " + ledger);
             totalWritten += rc;
         }
         if (totalWritten != count * pageSize) {
@@ -457,14 +453,20 @@ public class LedgerCacheImpl implements 
                 synchronized(this) {
                     Long cleanLedger = cleanLedgers.getFirst();
                     Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
-                    if (map == null || map.isEmpty()) {
+                    while (map == null || map.isEmpty()) {
                         cleanLedgers.removeFirst();
-                        continue;
+                        if (cleanLedgers.isEmpty()) {
+                            continue outerLoop; 
+                        }
+                        cleanLedger = cleanLedgers.getFirst();
+                        map = pages.get(cleanLedger);
                     }
                     Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
                     LedgerEntryPage lep = it.next().getValue();
                     while((lep.inUse() || !lep.isClean())) {
                         if (!it.hasNext()) {
+                            // no clean page found in this ledger
+                            cleanLedgers.removeFirst();
                             continue outerLoop;
                         }
                         lep = it.next().getValue();
@@ -581,19 +583,30 @@ public class LedgerCacheImpl implements 
     public void deleteLedger(long ledgerId) throws IOException {
         if (LOG.isDebugEnabled())
             LOG.debug("Deleting ledgerId: " + ledgerId);
+
+        // remove pages first to avoid page flushed when deleting file info
+        synchronized(this) {
+            pages.remove(ledgerId);
+        }
         // Delete the ledger's index file and close the FileInfo
-        FileInfo fi = getFileInfo(ledgerId, null);
-        fi.delete();
-        fi.close();
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            fi.delete();
+            fi.close(false);
+        } finally {
+            // should release use count
+            // otherwise the file channel would not be closed.
+            if (null != fi) {
+                fi.release();
+            }
+        }
 
         // Remove it from the active ledger manager
         activeLedgerManager.removeActiveLedger(ledgerId);
 
         // Now remove it from all the other lists and maps.
         // These data structures need to be synchronized first before removing entries.
-        synchronized(this) {
-            pages.remove(ledgerId);
-        }
         synchronized(fileInfoCache) {
             fileInfoCache.remove(ledgerId);
         }
@@ -628,9 +641,7 @@ public class LedgerCacheImpl implements 
                 if (lf == null) {
                     throw new Bookie.NoLedgerException(ledgerId);
                 }
-                if (openLedgers.size() > openFileLimit) {
-                    fileInfoCache.remove(openLedgers.removeFirst()).close();
-                }
+                evictFileInfoIfNecessary();        
                 fi = new FileInfo(lf, null);
                 byte[] key = fi.getMasterKey();
                 fileInfoCache.put(ledgerId, fi);
@@ -641,9 +652,26 @@ public class LedgerCacheImpl implements 
         }
     }
 
+    // evict file info if necessary
+    private void evictFileInfoIfNecessary() throws IOException {
+        if (openLedgers.size() > openFileLimit) {
+            long ledgerToRemove = openLedgers.removeFirst();
+            LOG.info("Ledger {} is evicted from file info cache.",
+                     ledgerToRemove);
+            fileInfoCache.remove(ledgerToRemove).close(true);
+        }
+    }
+
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
-        getFileInfo(ledgerId, masterKey);
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, masterKey);
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Thu Mar 29 13:18:34 2012
@@ -154,6 +154,18 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Set limitation of number of open files.
+     *
+     * @param fileLimit
+     *          Limitation of number of open files.
+     * @return server configuration
+     */
+    public ServerConfiguration setOpenFileLimit(int fileLimit) {
+        setProperty(OPEN_FILE_LIMIT, fileLimit);
+        return this;
+    }
+
+    /**
      * Get limitation number of index pages in ledger cache
      *
      * @return max number of index pages in ledger cache
@@ -163,6 +175,18 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Set limitation number of index pages in ledger cache.
+     *
+     * @param pageLimit
+     *          Limitation of number of index pages in ledger cache.
+     * @return server configuration
+     */
+    public ServerConfiguration setPageLimit(int pageLimit) {
+        this.setProperty(PAGE_LIMIT, pageLimit);
+        return this;
+    }
+
+    /**
      * Get page size
      *
      * @return page size in ledger cache

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Thu Mar 29 13:18:34 2012
@@ -64,7 +64,7 @@ public class BookieJournalTest {
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
-        fi.close();
+        fi.close(true);
     }
 
     private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
@@ -75,7 +75,7 @@ public class BookieJournalTest {
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
-        fi.close();
+        fi.close(true);
         // file info header
         int headerLen = 8 + 4 + masterKey.length;
         // truncate the index file

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1306839&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Thu Mar 29 13:18:34 2012
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.TestCase;
+
+/**
+ * LedgerCache related test cases
+ */
+public class LedgerCacheTest extends TestCase {
+    static Logger LOG = LoggerFactory.getLogger(LedgerCacheTest.class);
+
+    LedgerManager ledgerManager;
+    LedgerCache ledgerCache;
+    ServerConfiguration conf;
+    File txnDir, ledgerDir;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        txnDir = File.createTempFile("ledgercache", "txn");
+        txnDir.delete();
+        txnDir.mkdir();
+        ledgerDir = File.createTempFile("ledgercache", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+        // create current dir
+        new File(ledgerDir, Bookie.CURRENT_DIR).mkdir();
+
+        conf = new ServerConfiguration();
+        conf.setZkServers(null);
+        conf.setJournalDirName(txnDir.getPath());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, null);
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        ledgerManager.close();
+        FileUtils.deleteDirectory(txnDir);
+        FileUtils.deleteDirectory(ledgerDir);
+    }
+
+    private void newLedgerCache() {
+        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
+    }
+
+    @Test
+    public void testAddEntryException() {
+        // set page limitation
+        conf.setPageLimit(10);
+        // create a ledger cache
+        newLedgerCache();
+        /*
+         * Populate ledger cache.
+         */
+        try {
+            byte[] masterKey = "blah".getBytes();
+            for( int i = 0; i < 100; i++) {
+                ledgerCache.setMasterKey((long)i, masterKey);
+                ledgerCache.putEntryOffset(i, 0, i*8);
+            }
+        } catch (IOException e) {
+            LOG.error("Got IOException.", e);
+            fail("Failed to add entry.");
+        }
+    }
+
+    @Test
+    public void testLedgerEviction() throws Exception {
+        int numEntries = 10;
+        // limit open files & pages
+        conf.setOpenFileLimit(1).setPageLimit(2)
+            .setPageSize(8 * numEntries);
+        // create ledger cache
+        newLedgerCache();
+        try {
+            int numLedgers = 3;
+            byte[] masterKey = "blah".getBytes();
+            for (int i=1; i<=numLedgers; i++) {
+                ledgerCache.setMasterKey((long)i, masterKey);
+                for (int j=0; j<numEntries; j++) {
+                    ledgerCache.putEntryOffset(i, j, i * numEntries + j);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Got Exception.", e);
+            fail("Failed to add entry.");
+        }
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1306839&r1=1306838&r2=1306839&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java Thu Mar 29 13:18:34 2012
@@ -77,7 +77,7 @@ public class UpgradeTest {
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file
         fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
-        fi.close();
+        fi.close(true);
 
         long logId = 0;
         ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(1024);
@@ -245,4 +245,4 @@ public class UpgradeTest {
             System.setErr(origerr);
         }
     }
-}
\ No newline at end of file
+}