You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/12/24 05:50:03 UTC
svn commit: r1425585 [1/2] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ book...
Author: sijie
Date: Mon Dec 24 04:50:02 2012
New Revision: 1425585
URL: http://svn.apache.org/viewvc?rev=1425585&view=rev
Log:
BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC algorithm. (Fangmin, ivank, fpj via sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java
Removed:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec 24 04:50:02 2012
@@ -270,6 +270,8 @@ Trunk (unreleased changes)
BOOKKEEPER-490: add documentation for MetaStore interface (sijie, ivank via sijie)
+ BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC algorithm. (Fangmin, ivank, fpj via sijie)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Mon Dec 24 04:50:02 2012
@@ -39,7 +39,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -82,8 +82,8 @@ public class Bookie extends Thread {
final ServerConfiguration conf;
final SyncThread syncThread;
- final LedgerManagerFactory activeLedgerManagerFactory;
- final ActiveLedgerManager activeLedgerManager;
+ final LedgerManagerFactory ledgerManagerFactory;
+ final LedgerManager ledgerManager;
final LedgerStorage ledgerStorage;
final Journal journal;
@@ -478,17 +478,14 @@ public class Bookie extends Thread {
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
this.ledgerDirsManager = new LedgerDirsManager(conf);
-
// instantiate zookeeper client to initialize ledger manager
this.zk = instantiateZookeeperClient(conf);
checkEnvironment(this.zk);
-
- activeLedgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
- activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
-
+ ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+ LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
+ ledgerManager = ledgerManagerFactory.newLedgerManager();
syncThread = new SyncThread(conf);
- ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager,
- ledgerDirsManager);
+ ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, ledgerDirsManager);
handles = new HandleFactoryImpl(ledgerStorage);
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager);
@@ -910,8 +907,8 @@ public class Bookie extends Thread {
// close Ledger Manager
try {
- activeLedgerManager.close();
- activeLedgerManagerFactory.uninitialize();
+ ledgerManager.close();
+ ledgerManagerFactory.uninitialize();
} catch (IOException ie) {
LOG.error("Failed to close active ledger manager : ", ie);
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+/**
+ * This is the garbage collector interface, garbage collector implementers
+ * need to extends this class to remove the deleted ledgers.
+ */
+public interface GarbageCollector {
+ /**
+ * Do the garbage collector work
+ *
+ * @param garbageCleaner
+ * cleaner used to clean selected garbages
+ */
+ public abstract void gc(GarbageCleaner garbageCleaner);
+
+ /**
+ * A interface used to define customised garbage cleaner
+ */
+ public interface GarbageCleaner {
+
+ /**
+ * Clean a specific ledger
+ *
+ * @param ledgerId
+ * Ledger ID to be cleaned
+ */
+ public void clean(final long ledgerId) ;
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Mon Dec 24 04:50:02 2012
@@ -32,9 +32,11 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +73,7 @@ public class GarbageCollectorThread exte
// Ledger Cache Handle
final LedgerCache ledgerCache;
-
- final ActiveLedgerManager activeLedgerManager;
+ final SnapshotMap<Long, Boolean> activeLedgers;
// flag to ensure gc thread will not be interrupted during compaction
// to reduce the risk getting entry log corrupted
@@ -83,6 +84,9 @@ public class GarbageCollectorThread exte
// track the last scanned successfully log id
long scannedLogId = 0;
+ final GarbageCollector garbageCollector;
+ final GarbageCleaner garbageCleaner;
+
/**
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
@@ -114,19 +118,37 @@ public class GarbageCollectorThread exte
* @throws IOException
*/
public GarbageCollectorThread(ServerConfiguration conf,
- LedgerCache ledgerCache,
+ final LedgerCache ledgerCache,
EntryLogger entryLogger,
- ActiveLedgerManager activeLedgerManager,
- EntryLogScanner scanner)
+ SnapshotMap<Long, Boolean> activeLedgers,
+ EntryLogScanner scanner,
+ LedgerManager ledgerManager)
throws IOException {
super("GarbageCollectorThread");
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
- this.activeLedgerManager = activeLedgerManager;
+ this.activeLedgers = activeLedgers;
this.scanner = scanner;
this.gcWaitTime = conf.getGcWaitTime();
+
+ this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
+ @Override
+ public void clean(long ledgerId) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("delete ledger : " + ledgerId);
+ }
+ ledgerCache.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+ }
+ }
+ };
+
+ this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
+
// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
@@ -223,17 +245,7 @@ public class GarbageCollectorThread exte
* Do garbage collection ledger index files
*/
private void doGcLedgers() {
- activeLedgerManager.garbageCollectLedgers(
- new ActiveLedgerManager.GarbageCollector() {
- @Override
- public void gc(long ledgerId) {
- try {
- ledgerCache.deleteLedger(ledgerId);
- } catch (IOException e) {
- LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
- }
- }
- });
+ garbageCollector.gc(garbageCleaner);
}
/**
@@ -245,7 +257,7 @@ public class GarbageCollectorThread exte
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
- if (!activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+ if (!activeLedgers.containsKey(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Mon Dec 24 04:50:02 2012
@@ -26,8 +26,10 @@ import java.io.IOException;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +44,10 @@ class InterleavedLedgerStorage implement
EntryLogger entryLogger;
LedgerCache ledgerCache;
+
+ // A sorted map to stored all active ledger ids
+ protected final SnapshotMap<Long, Boolean> activeLedgers;
+
// This is the thread that garbage collects the entry logs that do not
// contain any active ledgers in them; and compacts the entry logs that
// has lower remaining percentage to reclaim disk space.
@@ -51,15 +57,16 @@ class InterleavedLedgerStorage implement
private volatile boolean somethingWritten = false;
InterleavedLedgerStorage(ServerConfiguration conf,
- ActiveLedgerManager activeLedgerManager,
- LedgerDirsManager ledgerDirsManager) throws IOException {
+ LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager)
+ throws IOException {
+ activeLedgers = new SnapshotMap<Long, Boolean>();
entryLogger = new EntryLogger(conf, ledgerDirsManager);
- ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager, ledgerDirsManager);
+ ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
- activeLedgerManager, new EntryLogCompactionScanner());
+ activeLedgers, new EntryLogCompactionScanner(), ledgerManager);
}
- @Override
+ @Override
public void start() {
gcThread.start();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Mon Dec 24 04:50:02 2012
@@ -35,7 +35,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -54,7 +54,8 @@ public class LedgerCacheImpl implements
private LedgerDirsManager ledgerDirsManager;
final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
- public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, LedgerDirsManager ledgerDirsManager)
+ public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
+ LedgerDirsManager ledgerDirsManager)
throws IOException {
this.ledgerDirsManager = ledgerDirsManager;
this.openFileLimit = conf.getOpenFileLimit();
@@ -69,7 +70,7 @@ public class LedgerCacheImpl implements
}
LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
- activeLedgerManager = alm;
+ this.activeLedgers = activeLedgers;
// Retrieve all of the active ledgers.
getActiveLedgers();
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
@@ -90,7 +91,7 @@ public class LedgerCacheImpl implements
// Manage all active ledgers in LedgerManager
// so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
- final ActiveLedgerManager activeLedgerManager;
+ final SnapshotMap<Long, Boolean> activeLedgers;
final int openFileLimit;
final int pageSize;
@@ -257,7 +258,7 @@ public class LedgerCacheImpl implements
// A new ledger index file has been created for this Bookie.
// Add this new ledger to the set of active ledgers.
LOG.debug("New ledger index file created for ledgerId: {}", ledger);
- activeLedgerManager.addActiveLedger(ledger, true);
+ activeLedgers.put(ledger, true);
}
evictFileInfoIfNecessary();
fi = new FileInfo(lf, masterKey);
@@ -697,7 +698,7 @@ public class LedgerCacheImpl implements
}
}
}
- activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
+ activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
}
}
}
@@ -739,7 +740,7 @@ public class LedgerCacheImpl implements
}
// Remove it from the active ledger manager
- activeLedgerManager.removeActiveLedger(ledgerId);
+ activeLedgers.remove(ledgerId);
// Now remove it from all the other lists and maps.
// These data structures need to be synchronized first before removing entries.
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Garbage collector implementation using scan and compare.
+ *
+ * <p>
+ * Garbage collection is processed as below:
+ * <ul>
+ * <li> fetch all existing ledgers from zookeeper or metastore according to
+ * the LedgerManager, called <b>globalActiveLedgers</b>
+ * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
+ * <li> loop over <b>bkActiveLedgers</b> to find those ledgers that are not in
+ * <b>globalActiveLedgers</b>, do garbage collection on them.
+ * </ul>
+ * </p>
+ */
+public class ScanAndCompareGarbageCollector implements GarbageCollector{
+
+ static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
+ private SnapshotMap<Long, Boolean> activeLedgers;
+ private LedgerManager ledgerManager;
+
+ public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, SnapshotMap<Long, Boolean> activeLedgers) {
+ this.ledgerManager = ledgerManager;
+ this.activeLedgers = activeLedgers;
+ }
+
+ @Override
+ public void gc(GarbageCleaner garbageCleaner) {
+ // create a snapshot first
+ NavigableMap<Long, Boolean> bkActiveLedgersSnapshot =
+ this.activeLedgers.snapshot();
+ LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges();
+ try {
+ // Empty global active ledgers, need to remove all local active ledgers.
+ if (!ledgerRangeIterator.hasNext()) {
+ for (Long bkLid : bkActiveLedgersSnapshot.keySet()) {
+ // remove it from current active ledger
+ bkActiveLedgersSnapshot.remove(bkLid);
+ garbageCleaner.clean(bkLid);
+ }
+ }
+ while(ledgerRangeIterator.hasNext()) {
+ LedgerRange lRange = ledgerRangeIterator.next();
+ Map<Long, Boolean> subBkActiveLedgers = null;
+ Long start = lRange.start();
+ Long end = lRange.end();
+ if (end != LedgerRange.NOLIMIT) {
+ subBkActiveLedgers = bkActiveLedgersSnapshot.subMap(start,
+ true, end, true);
+ } else {
+ if (start != LedgerRange.NOLIMIT) {
+ subBkActiveLedgers = bkActiveLedgersSnapshot.tailMap(start);
+ } else {
+ subBkActiveLedgers = bkActiveLedgersSnapshot;
+ }
+ }
+ Set<Long> globalActiveLedgers = lRange.getLedgers();
+ LOG.debug("All active ledgers for hash node {}, Current active ledgers from Bookie for hash node {}",
+ globalActiveLedgers, subBkActiveLedgers.keySet());
+ for (Long bkLid : subBkActiveLedgers.keySet()) {
+ if (!globalActiveLedgers.contains(bkLid)) {
+ // remove it from current active ledger
+ subBkActiveLedgers.remove(bkLid);
+ garbageCleaner.clean(bkLid);
+ }
+ }
+ }
+ } catch (Exception e) {
+ // ignore exception, collecting garbage next time
+ LOG.warn("Exception when iterating over the metadata {}", e);
+ }
+ }
+}
+
+
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Dec 24 04:50:02 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.versioning.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,7 @@ class LedgerDeleteOp extends OrderedSafe
public void initiate() {
// Asynchronously delete the ledger from meta manager
// When this completes, it will invoke the callback method below.
- bk.getLedgerManager().deleteLedger(ledgerId, this);
+ bk.getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY, this);
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.Map;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -32,6 +31,7 @@ import org.apache.bookkeeper.proto.Bookk
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.util.ZkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ import org.apache.zookeeper.data.Stat;
/**
* Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
*/
-abstract class AbstractZkLedgerManager implements LedgerManager, ActiveLedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager {
static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
@@ -55,9 +55,6 @@ abstract class AbstractZkLedgerManager i
protected final ZooKeeper zk;
protected final String ledgerRootPath;
- // A sorted map to stored all active ledger ids
- protected final SnapshotMap<Long, Boolean> activeLedgers;
-
/**
* ZooKeeper-based Ledger Manager Constructor
*
@@ -70,8 +67,6 @@ abstract class AbstractZkLedgerManager i
this.conf = conf;
this.zk = zk;
this.ledgerRootPath = conf.getZkLedgersRootPath();
-
- activeLedgers = new SnapshotMap<Long, Boolean>();
}
/**
@@ -93,9 +88,32 @@ abstract class AbstractZkLedgerManager i
*/
protected abstract long getLedgerId(String ledgerPath) throws IOException;
+ /**
+ * Removes ledger metadata from ZooKeeper if version matches.
+ *
+ * @param ledgerId ledger identifier
+ * @param version local version of metadata znode
+ * @param cb callback object
+ */
@Override
- public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
- zk.delete(getLedgerPath(ledgerId), -1, new VoidCallback() {
+ public void removeLedgerMetadata(final long ledgerId, final Version version,
+ final GenericCallback<Void> cb) {
+ int znodeVersion = -1;
+ if (Version.NEW == version) {
+ LOG.error("Request to delete ledger {} metadata with version set to the initial one", ledgerId);
+ cb.operationComplete(BKException.Code.MetadataVersionException, (Void)null);
+ return;
+ } else if (Version.ANY != version) {
+ if (!(version instanceof ZkVersion)) {
+ LOG.info("Not an instance of ZKVersion: {}", ledgerId);
+ cb.operationComplete(BKException.Code.MetadataVersionException, (Void)null);
+ return;
+ } else {
+ znodeVersion = ((ZkVersion)version).getZnodeVersion();
+ }
+ }
+
+ zk.delete(getLedgerPath(ledgerId), znodeVersion, new VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
int bkRc;
@@ -174,112 +192,6 @@ abstract class AbstractZkLedgerManager i
}
/**
- * Get all the ledgers in a single zk node
- *
- * @param nodePath
- * Zookeeper node path
- * @param getLedgersCallback
- * callback function to process ledgers in a single node
- */
- protected void asyncGetLedgersInSingleNode(final String nodePath, final GenericCallback<HashSet<Long>> getLedgersCallback) {
- // First sync ZK to make sure we're reading the latest active/available ledger nodes.
- zk.sync(nodePath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- LOG.debug("Sync node path {} return : {}", path, rc);
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
- .create(KeeperException.Code.get(rc), path));
- getLedgersCallback.operationComplete(rc, null);
- return;
- }
- // Sync has completed successfully so now we can poll ZK
- // and read in the latest set of active ledger nodes.
- doAsyncGetLedgersInSingleNode(nodePath, getLedgersCallback);
- }
- }, null);
- }
-
- private void doAsyncGetLedgersInSingleNode(final String nodePath,
- final GenericCallback<HashSet<Long>> getLedgersCallback) {
- zk.getChildren(nodePath, false, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> ledgerNodes) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Error polling ZK for the available ledger nodes: ", KeeperException
- .create(KeeperException.Code.get(rc), path));
- getLedgersCallback.operationComplete(rc, null);
- return;
- }
- LOG.debug("Retrieved current set of ledger nodes: {}", ledgerNodes);
- // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
- HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
- for (String ledgerNode : ledgerNodes) {
- if (isSpecialZnode(ledgerNode)) {
- continue;
- }
- try {
- // convert the node path to ledger id according to different ledger manager implementation
- allActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
- } catch (IOException ie) {
- LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
- // This is a pretty bad error as it indicates a ledger node in ZK
- // has an incorrect format. For now just continue and consider
- // this as a non-existent ledger.
- continue;
- }
- }
-
- getLedgersCallback.operationComplete(rc, allActiveLedgers);
-
- }
- }, null);
- }
-
- private static class GetLedgersCtx {
- int rc;
- boolean done = false;
- HashSet<Long> ledgers = null;
- }
-
- /**
- * Get all the ledgers in a single zk node
- *
- * @param nodePath
- * Zookeeper node path
- * @throws IOException
- * @throws InterruptedException
- */
- protected HashSet<Long> getLedgersInSingleNode(final String nodePath)
- throws IOException, InterruptedException {
- final GetLedgersCtx ctx = new GetLedgersCtx();
- LOG.debug("Try to get ledgers of node : {}", nodePath);
- asyncGetLedgersInSingleNode(nodePath, new GenericCallback<HashSet<Long>>() {
- @Override
- public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
- synchronized (ctx) {
- if (Code.OK.intValue() == rc) {
- ctx.ledgers = zkActiveLedgers;
- }
- ctx.rc = rc;
- ctx.done = true;
- ctx.notifyAll();
- }
- }
- });
-
- synchronized (ctx) {
- while (ctx.done == false) {
- ctx.wait();
- }
- }
- if (Code.OK.intValue() != ctx.rc) {
- throw new IOException("Error on getting ledgers from node " + nodePath);
- }
- return ctx.ledgers;
- }
-
- /**
* Process ledgers in a single zk node.
*
* <p>
@@ -309,14 +221,15 @@ abstract class AbstractZkLedgerManager i
final String path, final Processor<Long> processor,
final AsyncCallback.VoidCallback finalCb, final Object ctx,
final int successRc, final int failureRc) {
- asyncGetLedgersInSingleNode(path, new GenericCallback<HashSet<Long>>() {
+ ZkUtils.getChildrenInSingleNode(zk, path, new GenericCallback<List<String>>() {
@Override
- public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+ public void operationComplete(int rc, List<String> ledgerNodes) {
if (Code.OK.intValue() != rc) {
finalCb.processResult(failureRc, null, ctx);
return;
}
+ Set<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, path);
LOG.debug("Processing ledgers: {}", zkActiveLedgers);
// no ledgers found, return directly
@@ -353,43 +266,36 @@ abstract class AbstractZkLedgerManager i
return false;
}
- @Override
- public void close() {
- }
-
- @Override
- public void addActiveLedger(long ledgerId, boolean active) {
- activeLedgers.put(ledgerId, active);
- }
-
- @Override
- public void removeActiveLedger(long ledgerId) {
- activeLedgers.remove(ledgerId);
- }
-
- @Override
- public boolean containsActiveLedger(long ledgerId) {
- return activeLedgers.containsKey(ledgerId);
- }
-
/**
- * Do garbage collecting comparing hosted ledgers and zk ledgers
+ * Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
*
- * @param gc
- * Garbage collector to do garbage collection when found inactive/deleted ledgers
- * @param bkActiveLedgers
- * Active ledgers hosted in bookie server
- * @param zkAllLedgers
- * All ledgers stored in zookeeper
+ * @param ledgerNodes
+ * zk ledger nodes
+ * @param path
+ * the prefix path of the ledger nodes
+ * @return ledger id hash set
*/
- void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> zkAllLedgers) {
- // remove any active ledgers that doesn't exist in zk
- for (Long bkLid : bkActiveLedgers.keySet()) {
- if (!zkAllLedgers.contains(bkLid)) {
- // remove it from current active ledger
- bkActiveLedgers.remove(bkLid);
- gc.gc(bkLid);
+ protected Set<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+ Set<Long> zkActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
+ for (String ledgerNode : ledgerNodes) {
+ if (isSpecialZnode(ledgerNode)) {
+ continue;
+ }
+ try {
+ // convert the node path to ledger id according to different ledger manager implementation
+ zkActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
+ } catch (IOException e) {
+ LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
+ // This is a pretty bad error as it indicates a ledger node in ZK
+ // has an incorrect format. For now just continue and consider
+ // this as a non-existent ledger.
+ continue;
}
}
+ return zkActiveLedgers;
+ }
+
+ @Override
+ public void close() {
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -19,7 +19,7 @@ package org.apache.bookkeeper.meta;
*/
import java.io.IOException;
-import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.bookkeeper.client.BKException;
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.Ledg
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
@@ -47,18 +46,6 @@ import org.slf4j.LoggerFactory;
* All ledgers' metadata are put in a single zk node, created using zk sequential node.
* Each ledger node is prefixed with 'L'.
* </p>
- * <p>
- * All actived ledgers found in bookie server side is managed in a hash map.
- * </p>
- * <p>
- * Garbage collection in FlatLedgerManager is procssed as below:
- * <ul>
- * <li> fetch all existed ledgers from zookeeper, said <b>zkActiveLedgers</b>
- * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
- * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in
- * <b>zkActiveLedgers</b>, do garbage collection on them.
- * </ul>
- * </p>
*/
class FlatLedgerManager extends AbstractZkLedgerManager {
@@ -80,8 +67,7 @@ class FlatLedgerManager extends Abstract
public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
super(conf, zk);
- ledgerPrefix = ledgerRootPath + "/"
- + BookKeeperConstants.LEDGER_NODE_PREFIX;
+ ledgerPrefix = ledgerRootPath + "/" + StringUtils.LEDGER_NODE_PREFIX;
}
@Override
@@ -139,24 +125,29 @@ class FlatLedgerManager extends Abstract
}
@Override
- public void garbageCollectLedgers(GarbageCollector gc) {
- if (null == zk) {
- LOG.warn("Skip garbage collecting ledgers because there is no ZooKeeper handle.");
- return;
- }
- try {
- // create a snapshot first
- Map<Long, Boolean> bkActiveLedgers = activeLedgers.snapshot();
- Set<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK: {}. Current active ledgers from Bookie: {}.",
- zkActiveLedgers, bkActiveLedgers.keySet());
+ public LedgerRangeIterator getLedgerRanges() {
+ return new LedgerRangeIterator() {
+ // single iterator, can visit only one time
+ boolean hasMoreElement = true;
+ @Override
+ public boolean hasNext() {
+ return hasMoreElement;
}
- doGc(gc, bkActiveLedgers, zkActiveLedgers);
- } catch (IOException ie) {
- LOG.warn("Error during garbage collecting ledgers from " + ledgerRootPath, ie);
- } catch (InterruptedException inte) {
- LOG.warn("Interrupted during garbage collecting ledgers from " + ledgerRootPath, inte);
- }
+ @Override
+ public LedgerRange next() throws IOException {
+ if (!hasMoreElement) {
+ throw new NoSuchElementException();
+ }
+ hasMoreElement = false;
+ Set<Long> zkActiveLedgers;
+ try {
+ zkActiveLedgers = ledgerListToSet(
+ ZkUtils.getChildrenInSingleNode(zk, ledgerRootPath), ledgerRootPath);
+ } catch (InterruptedException e) {
+ throw new IOException("Error when get child nodes from zk", e);
+ }
+ return new LedgerRange(zkActiveLedgers);
+ }
+ };
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -69,11 +69,6 @@ class FlatLedgerManagerFactory extends L
}
@Override
- public ActiveLedgerManager newActiveLedgerManager() {
- return new FlatLedgerManager(conf, zk);
- }
-
- @Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
return new ZkLedgerUnderreplicationManager(conf, zk);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -22,16 +22,15 @@ import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Set;
-import java.util.Map;
-import java.util.NavigableMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
@@ -58,25 +57,7 @@ import org.slf4j.LoggerFactory;
* <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
* E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
* <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * failed to get children list of a too big znode during garbage collection.
- * <p>
- * All actived ledgers found in bookie server is managed in a sorted map, which ease us to pick
- * up all actived ledgers belongs to (level1, level2).
- * </p>
- * <p>
- * Garbage collection in HierarchicalLedgerManager is processed node by node as below:
- * <ul>
- * fetching all level1 nodes, by calling zk#getChildren(ledgerRootPath).
- * <ul>
- * for each level1 node, fetching their level2 nodes, by calling zk#getChildren(ledgerRootPath + "/" + level1)
- * <li> fetch all existed ledgers from zookeeper in level1/level2 node, said <b>zkActiveLedgers</b>
- * <li> fetch all active ledgers from bookie server in level1/level2, said <b>bkActiveLedgers</b>
- * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in <b>zkActiveLedgers</b>, do garbage collection on them.
- * </ul>
- * </ul>
- * Since garbage collection is running in background, HierarchicalLedgerManager did gc on single hash
- * node at a time to avoid consuming too much resources.
- * </p>
+ * errors during garbage collection due to lists of children that are too long.
*/
class HierarchicalLedgerManager extends AbstractZkLedgerManager {
@@ -196,15 +177,7 @@ class HierarchicalLedgerManager extends
@Override
public String getLedgerPath(long ledgerId) {
- String ledgerIdStr = StringUtils.getZKStringId(ledgerId);
- // do 2-4-4 split
- StringBuilder sb = new StringBuilder();
- sb.append(ledgerRootPath).append("/").append(
- ledgerIdStr.substring(0, 2)).append("/").append(
- ledgerIdStr.substring(2, 6)).append("/").append(
- BookKeeperConstants.LEDGER_NODE_PREFIX).append(
- ledgerIdStr.substring(6, 10));
- return sb.toString();
+ return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
}
@Override
@@ -213,26 +186,12 @@ class HierarchicalLedgerManager extends
throw new IOException("it is not a valid hashed path name : " + pathName);
}
String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
- String[] hierarchicalParts = hierarchicalPath.split("/");
- if (hierarchicalParts.length != 3) {
- throw new IOException("it is not a valid hierarchical path name : " + pathName);
- }
- hierarchicalParts[2] = hierarchicalParts[2]
- .substring(BookKeeperConstants.LEDGER_NODE_PREFIX.length());
- return getLedgerId(hierarchicalParts);
+ return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
}
// get ledger from all level nodes
private long getLedgerId(String...levelNodes) throws IOException {
- try {
- StringBuilder sb = new StringBuilder();
- for (String node : levelNodes) {
- sb.append(node);
- }
- return Long.parseLong(sb.toString());
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
+ return StringUtils.stringToHierarchicalLedgerId(levelNodes);
}
//
@@ -333,71 +292,6 @@ class HierarchicalLedgerManager extends
}, null);
}
- @Override
- public void garbageCollectLedgers(GarbageCollector gc) {
- if (null == zk) {
- LOG.warn("Skip garbage collecting ledgers because there is no ZooKeeper handle.");
- return;
- }
- // create a snapshot before garbage collection
- NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
- try {
- List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
- for (String l1Node : l1Nodes) {
- if (isSpecialZnode(l1Node)) {
- continue;
- }
- try {
- List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + l1Node, null);
- for (String l2Node : l2Nodes) {
- doGcByLevel(gc, l1Node, l2Node, snapshot);
- }
- } catch (Exception e) {
- LOG.warn("Exception during garbage collecting ledgers for " + l1Node
- + " of " + ledgerRootPath, e);
- }
- }
- } catch (Exception e) {
- LOG.warn("Exception during garbage collecting inactive/deleted ledgers", e);
- }
- }
-
- /**
- * Garbage collection a single node level1/level2
- *
- * @param gc
- * Garbage collector
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @param snapshot
- * Snapshot of the active ledgers map.
- * @throws IOException
- * @throws InterruptedException
- */
- void doGcByLevel(GarbageCollector gc, final String level1, final String level2,
- NavigableMap snapshot)
- throws IOException, InterruptedException {
-
- StringBuilder nodeBuilder = new StringBuilder();
- nodeBuilder.append(ledgerRootPath).append("/")
- .append(level1).append("/").append(level2);
- String nodePath = nodeBuilder.toString();
-
- Set<Long> zkActiveLedgers = getLedgersInSingleNode(nodePath);
- // get hosted ledgers in /level1/level2
- long startLedgerId = getStartLedgerIdByLevel(level1, level2);
- long endLedgerId = getEndLedgerIdByLevel(level1, level2);
- Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("For hash node: " + level1 + "/" + level2 + ": All active ledgers from ZK: "
- + zkActiveLedgers + ". Current active ledgers from Bookie: "+ bkActiveLedgers);
- }
-
- doGc(gc, bkActiveLedgers, zkActiveLedgers);
- }
-
/**
* Process list one by one in asynchronize way. Process will be stopped immediately
* when error occurred.
@@ -474,4 +368,99 @@ class HierarchicalLedgerManager extends
protected boolean isSpecialZnode(String znode) {
return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
}
+
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return new HierarchicalLedgerRangeIterator();
+ }
+
+ /**
+ * Iterator through each metadata bucket with hierarchical mode
+ */
+ private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+ private Iterator<String> l1NodesIter = null;
+ private Iterator<String> l2NodesIter = null;
+ private String curL1Nodes = "";
+ private boolean hasMoreElement = true;
+
+ /**
+ * iterate next level1 znode
+ *
+ * @return false if have visited all level1 nodes
+ * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+ */
+ private boolean nextL1Node() throws KeeperException, InterruptedException {
+ l2NodesIter = null;
+ while (l2NodesIter == null) {
+ if (l1NodesIter.hasNext()) {
+ curL1Nodes = l1NodesIter.next();
+ } else {
+ return false;
+ }
+ if (isSpecialZnode(curL1Nodes)) {
+ continue;
+ }
+ List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+ l2NodesIter = l2Nodes.iterator();
+ if (!l2NodesIter.hasNext()) {
+ l2NodesIter = null;
+ continue;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ try {
+ if (l1NodesIter == null) {
+ l1NodesIter = zk.getChildren(ledgerRootPath, null).iterator();
+ hasMoreElement = nextL1Node();
+ } else if (!l2NodesIter.hasNext()) {
+ hasMoreElement = nextL1Node();
+ }
+ } catch (Exception e) {
+ throw new IOException("Error when check more elements", e);
+ }
+ return hasMoreElement;
+ }
+
+ @Override
+ public LedgerRange next() throws IOException {
+ if (!hasMoreElement) {
+ throw new NoSuchElementException();
+ }
+ return getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+ }
+
+ /**
+ * Get a single node level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @throws IOException
+ */
+ LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+ throws IOException {
+ StringBuilder nodeBuilder = new StringBuilder();
+ nodeBuilder.append(ledgerRootPath).append("/")
+ .append(level1).append("/").append(level2);
+ String nodePath = nodeBuilder.toString();
+ List<String> ledgerNodes = null;
+ try {
+ ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+ } catch (InterruptedException e) {
+ throw new IOException("Error when get child nodes from zk", e);
+ }
+ Set<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK for hash node "
+ + level1 + "/" + level2 + " : " + zkActiveLedgers);
+ }
+ return new LedgerRange(zkActiveLedgers,
+ getStartLedgerIdByLevel(level1, level2), getEndLedgerIdByLevel(level1, level2));
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -69,11 +69,6 @@ class HierarchicalLedgerManagerFactory e
}
@Override
- public ActiveLedgerManager newActiveLedgerManager() {
- return new HierarchicalLedgerManager(conf, zk);
- }
-
- @Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
return new ZkLedgerUnderreplicationManager(conf, zk);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java Mon Dec 24 04:50:02 2012
@@ -19,11 +19,15 @@ package org.apache.bookkeeper.meta;
*/
import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
/**
* LedgerManager takes responsibility of ledger management in client side.
@@ -41,18 +45,25 @@ public interface LedgerManager extends C
* Metadata provided when creating a new ledger
* @param cb
* Callback when creating a new ledger.
+ * {@link BKException.Code.ZKException} return code when can't generate
+ * or extract new ledger id
*/
- public abstract void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+ public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
/**
- * Delete a specified ledger by ledgerId.
+ * Remove a specified ledger metadata by ledgerId and version.
*
* @param ledgerId
* Ledger Id
+ * @param version
+ * Ledger metadata version
* @param cb
- * Callback when deleted ledger.
+ * Callback when removed ledger metadata.
+ * {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
+ * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
+ * {@link BKException.Code.ZKException} return code when other issues happen.
*/
- public abstract void deleteLedger(long ledgerId, GenericCallback<Void> cb);
+ public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb);
/**
* Read ledger metadata of a specified ledger.
@@ -61,8 +72,10 @@ public interface LedgerManager extends C
* Ledger Id
* @param readCb
* Callback when read ledger metadata.
+ * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
+ * {@link BKException.Code.ZKException} return code when other issues happen.
*/
- public abstract void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
+ public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
/**
* Write ledger metadata.
@@ -73,8 +86,10 @@ public interface LedgerManager extends C
* Ledger Metadata to write
* @param cb
* Callback when finished writing ledger metadata.
+ * {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
+ * {@link BKException.Code.ZKException} return code when other issues happen.
*/
- public abstract void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
+ public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
/**
* Loop to process all ledgers.
@@ -99,4 +114,79 @@ public interface LedgerManager extends C
*/
public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
Object context, int successRc, int failureRc);
+
+ /**
+ * Loop to scan a range of metadata from metadata storage
+ *
+ * @return will return a iterator of the Ranges
+ */
+ public LedgerRangeIterator getLedgerRanges();
+
+ /*
+ * Used to represent the Ledgers range returned from the
+ * current scan.
+ */
+ public static class LedgerRange {
+ // ledger start and end ranges
+ private final long start;
+ private final long end;
+ public final static long NOLIMIT = -1;
+
+ // returned ledgers
+ private Set<Long> ledgers;
+
+ public LedgerRange(Set<Long> ledgers) {
+ this(ledgers, NOLIMIT, NOLIMIT);
+ }
+
+ public LedgerRange(Set<Long> ledgers, long start) {
+ this(ledgers, start, NOLIMIT);
+ }
+
+ public LedgerRange(Set<Long> ledgers, long start, long end) {
+ this.ledgers = ledgers;
+ this.start = start;
+ this.end = end;
+ }
+
+ public Long start() {
+ return this.start;
+ }
+
+ public Long end() {
+ return this.end;
+ }
+
+ public Set<Long> getLedgers() {
+ return this.ledgers;
+ }
+ }
+
+ /**
+ * Interface of the ledger meta range iterator from
+ * storage (e.g. in ZooKeeper or other key/value store)
+ */
+ interface LedgerRangeIterator {
+
+ /**
+ * @return true if there are records in the ledger metadata store. false
+ * only when there are indeed no records in ledger metadata store.
+ * @throws IOException thrown when there is any problem accessing the ledger
+ * metadata store. It is critical that it doesn't return false in the case
+ * in the case it fails to access the ledger metadata store. Otherwise it
+ * will end up deleting all ledgers by accident.
+ */
+ public boolean hasNext() throws IOException;
+
+ /**
+ * Get the next element.
+ *
+ * @return the next element.
+ * @throws IOException thrown when there is a problem accessing the ledger
+ * metadata store. It is critical that it doesn't return false in the case
+ * in the case it fails to access the ledger metadata store. Otherwise it
+ * will end up deleting all ledgers by accident.
+ */
+ public LedgerRange next() throws IOException;
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -76,15 +76,7 @@ public abstract class LedgerManagerFacto
public abstract LedgerManager newLedgerManager();
/**
- * return active ledger manager for server side to manage active ledgers.
- *
- * @return active ledger manager
- * @see ActiveLedgerManager
- */
- public abstract ActiveLedgerManager newActiveLedgerManager();
-
- /**
- * Return a ledger underreplication manager, which is used to
+ * Return a ledger underreplication manager, which is used to
* mark ledgers as unreplicated, and to retrieve a ledger which
* is underreplicated so that it can be rereplicated.
*
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -17,20 +17,17 @@
*/
package org.apache.bookkeeper.meta;
-import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_END_KEY;
import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -43,7 +40,6 @@ import org.apache.bookkeeper.metastore.M
import org.apache.bookkeeper.metastore.MetastoreException;
import org.apache.bookkeeper.metastore.MetastoreFactory;
import org.apache.bookkeeper.metastore.MetastoreScannableTable;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
import org.apache.bookkeeper.metastore.MetastoreTableItem;
import org.apache.bookkeeper.metastore.Value;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -171,8 +167,7 @@ public class MSLedgerManagerFactory exte
}
}
- static abstract class AbstractMsLedgerManager implements Closeable {
-
+ static class MsLedgerManager implements LedgerManager {
final ZooKeeper zk;
final AbstractConfiguration conf;
@@ -180,7 +175,17 @@ public class MSLedgerManagerFactory exte
final MetastoreScannableTable ledgerTable;
final int maxEntriesPerScan;
- AbstractMsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+ static final String IDGEN_ZNODE = "ms-idgen";
+ static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
+
+ // Path to generate global id
+ private final String idGenPath;
+
+ // we use this to prevent long stack chains from building up in
+ // callbacks
+ ScheduledExecutorService scheduler;
+
+ MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
this.conf = conf;
this.zk = zk;
this.metastore = metastore;
@@ -194,29 +199,7 @@ public class MSLedgerManagerFactory exte
}
// configuration settings
maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
- }
-
- @Override
- public void close() {
- ledgerTable.close();
- }
-
- }
- static class MsLedgerManager extends AbstractMsLedgerManager implements LedgerManager {
-
- static final String IDGEN_ZNODE = "ms-idgen";
- static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
-
- // Path to generate global id
- private final String idGenPath;
-
- // we use this to prevent long stack chains from building up in
- // callbacks
- ScheduledExecutorService scheduler;
-
- MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
- super(conf, zk, metastore);
this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
@@ -228,7 +211,7 @@ public class MSLedgerManagerFactory exte
} catch (Exception e) {
LOG.warn("Error when closing MsLedgerManager : ", e);
}
- super.close();
+ ledgerTable.close();
}
@Override
@@ -305,7 +288,8 @@ public class MSLedgerManagerFactory exte
}
@Override
- public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+ public void removeLedgerMetadata(final long ledgerId, final Version version,
+ final GenericCallback<Void> cb) {
MetastoreCallback<Void> msCallback = new MetastoreCallback<Void>() {
@Override
public void complete(int rc, Void value, Object ctx) {
@@ -321,7 +305,7 @@ public class MSLedgerManagerFactory exte
cb.operationComplete(bkRc, (Void) null);
}
};
- ledgerTable.remove(ledgerId2Key(ledgerId), Version.ANY, msCallback, null);
+ ledgerTable.remove(ledgerId2Key(ledgerId), version, msCallback, null);
}
@Override
@@ -475,129 +459,60 @@ public class MSLedgerManagerFactory exte
};
cursor.asyncReadEntries(maxEntriesPerScan, msCallback, null);
}
- }
-
- static class MsActiveLedgerManager extends AbstractMsLedgerManager implements ActiveLedgerManager {
- // A sorted map to stored all active ledger ids
- protected final SnapshotMap<Long, Boolean> activeLedgers;
-
- MsActiveLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
- super(conf, zk, metastore);
- activeLedgers = new SnapshotMap<Long, Boolean>();
- }
-
- @Override
- public void addActiveLedger(long ledgerId, boolean active) {
- activeLedgers.put(ledgerId, active);
- }
-
- @Override
- public void removeActiveLedger(long ledgerId) {
- activeLedgers.remove(ledgerId);
- }
-
- @Override
- public boolean containsActiveLedger(long ledgerId) {
- return activeLedgers.containsKey(ledgerId);
- }
-
- @Override
- public void garbageCollectLedgers(GarbageCollector gc) {
- LOG.debug("Start garbage collect ledgers.");
- NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
- Long nextLedger = 0L;
- int numRetries = 3;
- do {
- nextLedger = doGcLedgers(nextLedger, snapshot, gc);
- } while (null != nextLedger && --numRetries > 0);
- LOG.debug("End garbage collect ledgers.");
- }
-
- /**
- * Do garbage collection starting from <code>startLedger</code>.
- *
- * @param startLedgerId
- * Start Ledger id
- * @param snapshot
- * Current snapshot of active ledgers
- * @param gc
- * Garbage collector
- * @return null if finished scanning all ledgers, the next ledger id to
- * scan
- */
- private Long doGcLedgers(Long startLedgerId, NavigableMap<Long, Boolean> snapshot, GarbageCollector gc) {
- final SyncResult<MetastoreCursor> result = new SyncResult<MetastoreCursor>();
- MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
- @Override
- public void complete(int rc, MetastoreCursor cursor, Object ctx) {
- result.complete(rc, cursor);
- }
- };
- ledgerTable.openCursor(ledgerId2Key(startLedgerId), true, EMPTY_END_KEY, true, Order.ASC, NON_FIELDS,
- openCursorCb, null);
- result.block();
- if (MSException.Code.OK.getCode() != result.getRetCode()) {
- LOG.warn("Failed to open metastore cursor to run garbage collection : ",
- MSException.create(MSException.Code.get(result.getRetCode())));
- // failed to open a cursor, not continue until next gc
- return null;
+ class MSLedgerRangeIterator implements LedgerRangeIterator {
+ final CountDownLatch openCursorLatch = new CountDownLatch(1);
+ MetastoreCursor cursor = null;
+
+ MSLedgerRangeIterator() {
+ MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+ @Override
+ public void complete(int rc, MetastoreCursor newCursor, Object ctx) {
+ if (MSException.Code.OK.getCode() != rc) {
+ LOG.error("Error opening cursor for ledger range iterator {}", rc);
+ } else {
+ cursor = newCursor;
+ }
+ openCursorLatch.countDown();
+ }
+ };
+ ledgerTable.openCursor(NON_FIELDS, openCursorCb, null);
}
- MetastoreCursor cursor = result.getResult();
-
- while (cursor.hasMoreEntries()) {
- Iterator<MetastoreTableItem> entries;
+ @Override
+ public boolean hasNext() {
try {
- entries = cursor.readEntries(maxEntriesPerScan);
- } catch (MSException mse) {
- LOG.warn("Exception when garbage collecting deleted ledgers : ", mse);
- return startLedgerId;
+ openCursorLatch.await();
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted waiting for cursor to open", ie);
+ Thread.currentThread().interrupt();
+ return false;
}
-
- if (!entries.hasNext()) {
- continue;
+ if (cursor == null) {
+ return false;
}
+ return cursor.hasMoreEntries();
+ }
- SortedSet<Long> msActiveLedgers = entries2Ledgers(entries);
-
- Long endLedgerId = msActiveLedgers.last();
- Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from Metastore between {} and {} : {}", new Object[] { startLedgerId,
- endLedgerId, msActiveLedgers });
- LOG.debug("Current active ledgers from Bookie between {} and {} : {}", new Object[] {
- startLedgerId, endLedgerId, bkActiveLedgers });
+ @Override
+ public LedgerRange next() throws IOException {
+ try {
+ Set<Long> ledgerIds = new TreeSet<Long>();
+ Iterator<MetastoreTableItem> iter = cursor.readEntries(maxEntriesPerScan);
+ while (iter.hasNext()) {
+ ledgerIds.add(key2LedgerId(iter.next().getKey()));
+ }
+ return new LedgerRange(ledgerIds);
+ } catch (MSException mse) {
+ LOG.error("Exception occurred reading from metastore", mse);
+ throw new IOException("Couldn't read from metastore", mse);
}
- doGc(gc, bkActiveLedgers, msActiveLedgers);
- // move the pointer
- startLedgerId = endLedgerId + 1;
}
- doGc(gc, snapshot.tailMap(startLedgerId), new TreeSet<Long>());
- return null;
}
- /**
- * Do garbage collecting comparing hosted ledgers and metastore ledgers
- *
- * @param gc
- * Garbage collector to do garbage collection when found
- * inactive/deleted ledgers
- * @param bkActiveLedgers
- * Active ledgers hosted in bookie server
- * @param msAllLedgers
- * All ledgers stored in metastore
- */
- void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> msAllLedgers) {
- // remove any active ledgers that doesn't exist in zk
- for (Long bkLid : bkActiveLedgers.keySet()) {
- if (!msAllLedgers.contains(bkLid)) {
- // remove it from current active ledger
- LOG.debug("gc ledger: {}", bkLid);
- bkActiveLedgers.remove(bkLid);
- gc.gc(bkLid);
- }
- }
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return new MSLedgerRangeIterator();
}
}
@@ -607,11 +522,6 @@ public class MSLedgerManagerFactory exte
}
@Override
- public ActiveLedgerManager newActiveLedgerManager() {
- return new MsActiveLedgerManager(conf, zk, metastore);
- }
-
- @Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
InterruptedException, ReplicationException.CompatibilityException {
// TODO: currently just use zk ledger underreplication manager
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,131 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A snapshotable map.
+ */
+public class SnapshotMap<K, V> {
+ // stores recent updates
+ volatile Map<K, V> updates;
+ volatile Map<K, V> updatesToMerge;
+ // map stores all snapshot data
+ volatile NavigableMap<K, V> snapshot;
+
+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public SnapshotMap() {
+ updates = new ConcurrentHashMap<K, V>();
+ updatesToMerge = new ConcurrentHashMap<K, V>();
+ snapshot = new ConcurrentSkipListMap<K, V>();
+ }
+
+ /**
+ * Create a snapshot of current map.
+ *
+ * @return a snapshot of current map.
+ */
+ public NavigableMap<K, V> snapshot() {
+ this.lock.writeLock().lock();
+ try {
+ if (updates.isEmpty()) {
+ return snapshot;
+ }
+ // put updates for merge to snapshot
+ updatesToMerge = updates;
+ updates = new ConcurrentHashMap<K, V>();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ // merging the updates to snapshot
+ for (Map.Entry<K, V> entry : updatesToMerge.entrySet()) {
+ snapshot.put(entry.getKey(), entry.getValue());
+ }
+ // clear updatesToMerge
+ this.lock.writeLock().lock();
+ try {
+ updatesToMerge = new ConcurrentHashMap<K, V>();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ return snapshot;
+ }
+
+ /**
+ * Associates the specified value with the specified key in this map.
+ *
+ * @param key
+ * Key with which the specified value is to be associated.
+ * @param value
+ * Value to be associated with the specified key.
+ */
+ public void put(K key, V value) {
+ this.lock.readLock().lock();
+ try {
+ updates.put(key, value);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+
+ }
+
+ /**
+ * Removes the mapping for the key from this map if it is present.
+ *
+ * @param key
+ * Key whose mapping is to be removed from this map.
+ */
+ public void remove(K key) {
+ this.lock.readLock().lock();
+ try {
+ // first remove updates
+ updates.remove(key);
+ updatesToMerge.remove(key);
+ // then remove snapshot
+ snapshot.remove(key);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Returns true if this map contains a mapping for the specified key.
+ *
+ * @param key
+ * Key whose presence is in the map to be tested.
+ * @return true if the map contains a mapping for the specified key.
+ */
+ public boolean containsKey(K key) {
+ this.lock.readLock().lock();
+ try {
+ return updates.containsKey(key)
+ | updatesToMerge.containsKey(key)
+ | snapshot.containsKey(key);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java Mon Dec 24 04:50:02 2012
@@ -20,8 +20,6 @@ package org.apache.bookkeeper.util;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
/**
* Provided utilites for parsing network addresses, ledger-id from node paths
@@ -30,6 +28,9 @@ import org.apache.bookkeeper.conf.Server
*/
public class StringUtils {
+ // Ledger Node Prefix
+ static public final String LEDGER_NODE_PREFIX = "L";
+
/**
* Parses address into IP and port.
*
@@ -68,4 +69,61 @@ public class StringUtils {
return String.format("%010d", id);
}
+ /**
+ * Get the hierarchical ledger path according to the ledger id
+ *
+ * @param ledgerId
+ * ledger id
+ * @return the hierarchical path
+ */
+ public static String getHierarchicalLedgerPath(long ledgerId) {
+ String ledgerIdStr = getZKStringId(ledgerId);
+ // do 2-4-4 split
+ StringBuilder sb = new StringBuilder();
+ sb.append("/")
+ .append(ledgerIdStr.substring(0, 2)).append("/")
+ .append(ledgerIdStr.substring(2, 6)).append("/")
+ .append(LEDGER_NODE_PREFIX)
+ .append(ledgerIdStr.substring(6, 10));
+ return sb.toString();
+ }
+
+ /**
+ * Parse the hierarchical ledger path to its ledger id
+ *
+ * @param hierarchicalLedgerPath
+ * @return the ledger id
+ * @throws IOException
+ */
+ public static long stringToHierarchicalLedgerId(String hierarchicalLedgerPath)
+ throws IOException {
+ String[] hierarchicalParts = hierarchicalLedgerPath.split("/");
+ if (hierarchicalParts.length != 3) {
+ throw new IOException("it is not a valid hierarchical path name : " + hierarchicalLedgerPath);
+ }
+ hierarchicalParts[2] =
+ hierarchicalParts[2].substring(LEDGER_NODE_PREFIX.length());
+ return stringToHierarchicalLedgerId(hierarchicalParts);
+ }
+
+ /**
+ * Get ledger id
+ *
+ * @param levelNodes
+ * level of the ledger path
+ * @return ledger id
+ * @throws IOException
+ */
+ public static long stringToHierarchicalLedgerId(String...levelNodes) throws IOException {
+ try {
+ StringBuilder sb = new StringBuilder();
+ for (String node : levelNodes) {
+ sb.append(node);
+ }
+ return Long.parseLong(sb.toString());
+ } catch (NumberFormatException e) {
+ throw new IOException(e);
+ }
+ }
+
}