You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/03/31 09:13:08 UTC

svn commit: r1307725 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/

Author: sijie
Date: Sat Mar 31 07:13:07 2012
New Revision: 1307725

URL: http://svn.apache.org/viewvc?rev=1307725&view=rev
Log:
BOOKKEEPER-193: Ledger is garbage collected by mistake. (sijie, ivank via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java   (with props)
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java   (with props)
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java   (with props)
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1307725&r1=1307724&r2=1307725&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Mar 31 07:13:07 2012
@@ -70,6 +70,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-166: Bookie will not recover its journal if the length prefix of an entry is truncated (ivank)
 
+        BOOKKEEPER-193: Ledger is garbage collected by mistake. (sijie, ivank via sijie)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1307725&r1=1307724&r2=1307725&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Sat Mar 31 07:13:07 2012
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.meta;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.Map;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -252,4 +254,25 @@ abstract class AbstractZkLedgerManager i
     @Override
     public void close() {
     }
+
+    /**
+     * Do garbage collecting comparing hosted ledgers and zk ledgers
+     *
+     * @param gc
+     *          Garbage collector to do garbage collection when found inactive/deleted ledgers
+     * @param bkActiveLedgers
+     *          Active ledgers hosted in bookie server
+     * @param zkAllLedgers
+     *          All ledgers stored in zookeeper
+     */
+    void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> zkAllLedgers) {
+        // remove any active ledgers that doesn't exist in zk
+        for (Long bkLid : bkActiveLedgers.keySet()) {
+            if (!zkAllLedgers.contains(bkLid)) {
+                // remove it from current active ledger
+                bkActiveLedgers.remove(bkLid);
+                gc.gc(bkLid);
+            }
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1307725&r1=1307724&r2=1307725&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Sat Mar 31 07:13:07 2012
@@ -19,9 +19,9 @@ package org.apache.bookkeeper.meta;
  */
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -67,7 +67,7 @@ class FlatLedgerManager extends Abstract
     // path prefix to store ledger znodes
     private final String ledgerPrefix;
     // hash map to store all active ledger ids
-    private ConcurrentMap<Long, Boolean> activeLedgers;
+    private SnapshotMap<Long, Boolean> activeLedgers;
 
     /**
      * Constructor
@@ -91,7 +91,7 @@ class FlatLedgerManager extends Abstract
         }
 
         ledgerPrefix = ledgerRootPath + "/" + LEDGER_NODE_PREFIX;
-        activeLedgers = new ConcurrentHashMap<Long, Boolean>();
+        activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
     @Override
@@ -158,8 +158,9 @@ class FlatLedgerManager extends Abstract
     @Override
     public void garbageCollectLedgers(GarbageCollector gc) {
         try {
-            HashSet<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
-            ConcurrentMap<Long, Boolean> bkActiveLedgers = activeLedgers;
+            // create a snapshot first
+            Map<Long, Boolean> bkActiveLedgers = activeLedgers.snapshot();
+            Set<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("All active ledgers from ZK: " + zkActiveLedgers);
                 LOG.debug("Current active ledgers from Bookie: " + bkActiveLedgers.keySet());
@@ -171,26 +172,4 @@ class FlatLedgerManager extends Abstract
             LOG.warn("Interrupted during garbage collecting ledgers from " + ledgerRootPath, inte);
         }
     }
-
-    /**
-     * Do garbage collecting comparing hosted ledgers and zk ledgers
-     *
-     * @param gc
-     *          Garbage collector to do garbage collection when found inactive/deleted ledgers
-     * @param bkActiveLedgers
-     *          Active ledgers hosted in bookie server
-     * @param zkAllLedgers
-     *          All ledgers stored in zookeeper
-     */
-    void doGc(GarbageCollector gc, ConcurrentMap<Long, Boolean> bkActiveLedgers, HashSet<Long> zkAllLedgers) {
-        // remove any active ledgers that doesn't exist in zk
-        for (Long bkLid : bkActiveLedgers.keySet()) {
-            if (!zkAllLedgers.contains(bkLid)) {
-                // remove it from current active ledger
-                bkActiveLedgers.remove(bkLid);
-                gc.gc(bkLid);
-            }
-        }
-    }
-
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1307725&r1=1307724&r2=1307725&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java Sat Mar 31 07:13:07 2012
@@ -19,11 +19,12 @@ package org.apache.bookkeeper.meta;
  */
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
+import java.util.Map;
+import java.util.NavigableMap;
 import java.util.HashSet;
 import java.util.List;
 
@@ -92,7 +93,7 @@ class HierarchicalLedgerManager extends 
     // Path to generate global id
     private final String idGenPath;
     // A sorted map to stored all active ledger ids
-    private ConcurrentSkipListMap<Long, Boolean> activeLedgers;
+    private SnapshotMap<Long, Boolean> activeLedgers;
 
     // we use this to prevent long stack chains from building up in callbacks
     ScheduledExecutorService scheduler;
@@ -119,7 +120,8 @@ class HierarchicalLedgerManager extends 
         }
 
         this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
-        this.activeLedgers = new ConcurrentSkipListMap<Long, Boolean>();
+        activeLedgers = new SnapshotMap<Long, Boolean>();
+
         this.scheduler = Executors.newSingleThreadScheduledExecutor();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Using HierarchicalLedgerManager with root path : " + ledgerRootPath);
@@ -365,6 +367,8 @@ class HierarchicalLedgerManager extends 
 
     @Override
     public void garbageCollectLedgers(GarbageCollector gc) {
+        // create a snapshot before garbage collection
+        NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
         try {
             List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
             for (String l1Node : l1Nodes) {
@@ -374,7 +378,7 @@ class HierarchicalLedgerManager extends 
                 try {
                     List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + l1Node, null);
                     for (String l2Node : l2Nodes) {
-                        doGcByLevel(gc, l1Node, l2Node);
+                        doGcByLevel(gc, l1Node, l2Node, snapshot);
                     }
                 } catch (Exception e) {
                     LOG.warn("Exception during garbage collecting ledgers for " + l1Node
@@ -395,10 +399,13 @@ class HierarchicalLedgerManager extends 
      *          1st level node name
      * @param level2
      *          2nd level node name
+     * @param snapshot
+     *          Snapshot of the active ledgers map.
      * @throws IOException
      * @throws InterruptedException
      */
-    void doGcByLevel(GarbageCollector gc, final String level1, final String level2)
+    void doGcByLevel(GarbageCollector gc, final String level1, final String level2,
+                     NavigableMap snapshot)
         throws IOException, InterruptedException {
 
         StringBuilder nodeBuilder = new StringBuilder();
@@ -406,12 +413,11 @@ class HierarchicalLedgerManager extends 
                    .append(level1).append("/").append(level2);
         String nodePath = nodeBuilder.toString();
 
-        HashSet<Long> zkActiveLedgers = getLedgersInSingleNode(nodePath);
+        Set<Long> zkActiveLedgers = getLedgersInSingleNode(nodePath);
         // get hosted ledgers in /level1/level2
         long startLedgerId = getStartLedgerIdByLevel(level1, level2);
         long endLedgerId = getEndLedgerIdByLevel(level1, level2);
-        ConcurrentMap<Long, Boolean> bkActiveLedgers =
-            activeLedgers.subMap(startLedgerId, true, endLedgerId, true);
+        Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
         if (LOG.isDebugEnabled()) {
             LOG.debug("All active ledgers from ZK for hash node "
                       + level1 + "/" + level2 + " : " + zkActiveLedgers);
@@ -423,28 +429,6 @@ class HierarchicalLedgerManager extends 
     }
 
     /**
-     * Do garbage collecting comparing hosted ledgers and zk ledgers
-     *
-     * @param gc
-     *          Garbage collector
-     * @param bkActiveLedgers
-     *          Active ledgers hosted in bookie server
-     * @param zkAllLedgers
-     *          All ledgers stored in zookeeper
-     */
-    void doGc(GarbageCollector gc, ConcurrentMap<Long, Boolean> bkActiveLedgers,
-              HashSet<Long> zkAllLedgers) {
-        // remove any active ledgers that doesn't exist in zk
-        for (Long lid : bkActiveLedgers.keySet()) {
-            if (!zkAllLedgers.contains(lid)) {
-                // remove it from current active ledger
-                bkActiveLedgers.remove(lid);
-                gc.gc(lid);
-            }
-        }
-    }
-
-    /**
      * Process list one by one in asynchronize way. Process will be stopped immediately
      * when error occurred.
      */

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java?rev=1307725&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java Sat Mar 31 07:13:07 2012
@@ -0,0 +1,132 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A snapshotable map.
+ */
+class SnapshotMap<K, V> {
+    // stores recent updates
+    volatile Map<K, V> updates;
+    volatile Map<K, V> updatesToMerge;
+    // map stores all snapshot data
+    volatile NavigableMap<K, V> snapshot;
+
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public SnapshotMap() {
+        updates = new ConcurrentHashMap<K, V>();
+        updatesToMerge = new ConcurrentHashMap<K, V>();
+        snapshot = new ConcurrentSkipListMap<K, V>();
+    }
+
+    /**
+     * Create a snapshot of current map.
+     *
+     * @return a snapshot of current map.
+     */
+    public NavigableMap<K, V> snapshot() {
+        this.lock.writeLock().lock();
+        try {
+            if (updates.isEmpty()) {
+                return snapshot;
+            }
+            // put updates for merge to snapshot
+            updatesToMerge = updates;
+            updates = new ConcurrentHashMap<K, V>();
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+        // merging the updates to snapshot
+        for (Map.Entry<K, V> entry : updatesToMerge.entrySet()) {
+            snapshot.put(entry.getKey(), entry.getValue());
+        }
+        // clear updatesToMerge
+        this.lock.writeLock().lock();
+        try {
+            updatesToMerge = new ConcurrentHashMap<K, V>();
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+        return snapshot;
+    }
+
+    /**
+     * Associates the specified value with the specified key in this map.
+     *
+     * @param key
+     *          Key with which the specified value is to be associated.
+     * @param value
+     *          Value to be associated with the specified key.
+     */
+    public void put(K key, V value) {
+        this.lock.readLock().lock();
+        try {
+            updates.put(key, value);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+    }
+
+    /**
+     * Removes the mapping for the key from this map if it is present.
+     *
+     * @param key
+     *          Key whose mapping is to be removed from this map.
+     */
+    public void remove(K key) {
+        this.lock.readLock().lock();
+        try {
+            // first remove updates
+            updates.remove(key);
+            updatesToMerge.remove(key);
+            // then remove snapshot
+            snapshot.remove(key);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Returns true if this map contains a mapping for the specified key.
+     *
+     * @param key
+     *          Key whose presence is in the map to be tested.
+     * @return true if the map contains a mapping for the specified key.
+     */
+    public boolean containsKey(K key) {
+        this.lock.readLock().lock();
+        try {
+            return updates.containsKey(key)
+                 | updatesToMerge.containsKey(key)
+                 | snapshot.containsKey(key);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+    }
+}

Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java?rev=1307725&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java Sat Mar 31 07:13:07 2012
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager.GarbageCollector;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Test;
+
+/**
+ * Test garbage collection ledgers in ledger manager
+ */
+public class GcLedgersTest extends LedgerManagerTestCase {
+    static final Logger LOG = LoggerFactory.getLogger(GcLedgersTest.class);
+
+    public GcLedgersTest(String ledgerManagerType) {
+        super(ledgerManagerType);
+    }
+
+    /**
+     * Create ledgers
+     */
+    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
+        final AtomicInteger expected = new AtomicInteger(numLedgers);
+        for (int i=0; i<numLedgers; i++) {
+            ledgerManager.newLedgerPath(new GenericCallback<String>() {
+                @Override
+                public void operationComplete(int rc, String ledgerPath) {
+                    if (rc == BKException.Code.OK) {
+                        try {
+                            long ledgerId = ledgerManager.getLedgerId(ledgerPath);
+                            ledgerManager.addActiveLedger(ledgerId, true);
+                            createdLedgers.add(ledgerId);
+                        } catch (IOException ie) {
+                        }
+                    }
+                    synchronized (expected) {
+                        int num = expected.decrementAndGet();
+                        if (num == 0) {
+                            expected.notify();
+                        }
+                    }
+                }
+            }, new LedgerMetadata(1, 1));
+        }
+        synchronized (expected) {
+            try {
+                while (expected.get() > 0) {
+                    expected.wait(100);
+                }
+            } catch (InterruptedException ie) {
+            }
+        }
+    }
+
+    @Test
+    public void testGarbageCollectLedgers() throws Exception {
+        int numLedgers = 100;
+        int numRemovedLedgers = 10;
+
+        final Set<Long> createdLedgers = new HashSet<Long>();
+        final Set<Long> removedLedgers = new HashSet<Long>();
+
+        // create 100 ledgers
+        createLedgers(numLedgers, createdLedgers);
+
+        Random r = new Random(System.currentTimeMillis());
+        final List<Long> tmpList = new ArrayList<Long>();
+        tmpList.addAll(createdLedgers);
+        Collections.shuffle(tmpList, r);
+        // random remove several ledgers
+        for (int i=0; i<numRemovedLedgers; i++) {
+            long ledgerId = tmpList.get(i);
+            zkc.delete(ledgerManager.getLedgerPath(ledgerId), -1);
+            removedLedgers.add(ledgerId);
+            createdLedgers.remove(ledgerId);
+        }
+        final CountDownLatch inGcProgress = new CountDownLatch(1);
+        final CountDownLatch createLatch = new CountDownLatch(1);
+        final CountDownLatch endLatch = new CountDownLatch(2);
+
+        Thread gcThread = new Thread() {
+            @Override
+            public void run() {
+                ledgerManager.garbageCollectLedgers(new GarbageCollector() {
+                    boolean paused = false;
+                    @Override
+                    public void gc(long ledgerId) {
+                        if (!paused) {
+                            inGcProgress.countDown();
+                            try {
+                                createLatch.await();
+                            } catch (InterruptedException ie) {
+                            }
+                            paused = true;
+                        }
+                        LOG.info("Garbage Collected ledger {}", ledgerId);
+                    }
+                });
+                LOG.info("Gc Thread quits.");
+                endLatch.countDown();
+            }
+        };
+
+        Thread createThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    inGcProgress.await();
+                    // create 10 more ledgers
+                    createLedgers(10, createdLedgers);
+                    LOG.info("Finished creating 10 more ledgers.");
+                    createLatch.countDown();
+                } catch (Exception e) {
+                }
+                LOG.info("Create Thread quits.");
+                endLatch.countDown();
+            }
+        };
+
+        createThread.start();
+        gcThread.start();
+
+        endLatch.await();
+
+        // test ledgers
+        for (Long ledger : removedLedgers) {
+            assertFalse(ledgerManager.containsActiveLedger(ledger));
+        }
+        for (Long ledger : createdLedgers) {
+            assertTrue(ledgerManager.containsActiveLedger(ledger));
+        }
+    }
+}

Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java?rev=1307725&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java Sat Mar 31 07:13:07 2012
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.meta;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case to run over serveral ledger managers
+ */
+@RunWith(Parameterized.class)
+public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
+    static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class);
+
+    LedgerManager ledgerManager;
+
+    public LedgerManagerTestCase(String ledgerManagerType) {
+        super(0);
+        baseConf.setLedgerManagerType(ledgerManagerType);
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { FlatLedgerManager.NAME },
+            { HierarchicalLedgerManager.NAME }
+        });
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        ledgerManager = LedgerManagerFactory.newLedgerManager(baseConf, zkc);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        ledgerManager.close();
+        super.tearDown();
+    }
+
+}

Propchange: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native