You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/28 19:25:27 UTC
svn commit: r1207495 [1/3] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apac...
Author: ivank
Date: Mon Nov 28 18:25:18 2011
New Revision: 1207495
URL: http://svn.apache.org/viewvc?rev=1207495&view=rev
Log:
BOOKKEEPER-39: Bookie server failed to restart because of too many ledgers (more than ~50,000 ledgers) (Sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalAsyncLedgerOpsTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieFailureTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieReadWriteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
zookeeper/bookkeeper/trunk/doc/bookkeeperConfig.textile
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 28 18:25:18 2011
@@ -80,6 +80,8 @@ BUGFIXES:
BOOKKEEPER-114: add a shutdown hook to shut down bookie server safely. (Sijie via ivank)
+ BOOKKEEPER-39: Bookie server failed to restart because of too many ledgers (more than ~50,000 ledgers) (Sijie via ivank)
+
hedwig-server/
BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Mon Nov 28 18:25:18 2011
@@ -38,6 +38,16 @@ journalDirectory=/tmp/bk-txn
# It is possible to run with a single disk, but performance will be significantly lower.
ledgerDirectories=/tmp/bk-data
+# Ledger Manager Class
+# What kind of ledger manager is used to manage how ledgers are stored, managed
+# and garbage collected. Try to read 'BookKeeper Overview' for detail info.
+# ledgerManagerType=flat
+
+# Root zookeeper path to store ledger metadata
+# This parameter is used by zookeeper-based ledger manager as a root znode to
+# store all ledgers.
+# zkLedgersRootPath=/ledgers
+
# Max file size of entry logger, in bytes
# A new entry log file will be created when the old one reaches the file size limitation
# logSizeLimit=2147483648
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Mon Nov 28 18:25:18 2011
@@ -43,20 +43,20 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-
-
/**
* Implements a bookie.
*
@@ -78,6 +78,7 @@ public class Bookie extends Thread {
final ServerConfiguration conf;
final SyncThread syncThread;
+ final LedgerManager ledgerManager;
/**
* Current directory layout version. Increment this
@@ -242,23 +243,28 @@ public class Bookie extends Thread {
}
}
- public Bookie(ServerConfiguration conf) throws IOException {
+ public Bookie(ServerConfiguration conf)
+ throws IOException, KeeperException, InterruptedException {
+ this.conf = conf;
this.journalDirectory = conf.getJournalDir();
this.ledgerDirectories = conf.getLedgerDirs();
- this.conf = conf;
+ this.maxJournalSize = conf.getMaxJournalSize() * MB;
+ this.maxBackupJournals = conf.getMaxBackupJournals();
+ // check directory layouts
checkDirectoryLayoutVersion(journalDirectory);
for (File dir : ledgerDirectories) {
checkDirectoryLayoutVersion(dir);
}
- this.maxJournalSize = conf.getMaxJournalSize() * MB;
- this.maxBackupJournals = conf.getMaxBackupJournals();
+ // instantiate zookeeper client to initialize ledger manager
+ ZooKeeper newZk = instantiateZookeeperClient(conf.getZkServers());
+ ledgerManager = LedgerManagerFactory.newLedgerManager(conf, newZk);
syncThread = new SyncThread(conf);
entryLogger = new EntryLogger(conf, this);
- ledgerCache = new LedgerCache(conf);
-
+ ledgerCache = new LedgerCache(conf, ledgerManager);
+
lastLogMark.readLog();
if (LOG.isDebugEnabled()) {
LOG.debug("Last Log Mark : " + lastLogMark);
@@ -332,7 +338,12 @@ public class Bookie extends Thread {
}
}
}
- instantiateZookeeperClient(conf.getBookiePort(), conf.getZkServers());
+ // pass zookeeper instance here
+ // since GarbageCollector thread should only start after journal
+ // finished replay
+ this.zk = newZk;
+ // make the bookie available
+ registerBookie(conf.getBookiePort());
setDaemon(true);
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
start();
@@ -375,20 +386,29 @@ public class Bookie extends Thread {
Collections.sort(logs);
return logs;
}
-
+
/**
* Instantiate the ZooKeeper client for the Bookie.
*/
- private void instantiateZookeeperClient(int port, String zkServers) throws IOException {
+ private ZooKeeper instantiateZookeeperClient(String zkServers) throws IOException {
if (zkServers == null) {
LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
- zk = null;
isZkExpired = false;
- return;
+ return null;
}
int zkTimeout = conf.getZkTimeout();
// Create the ZooKeeper client instance
- zk = newZookeeper(zkServers, zkTimeout);
+ return newZookeeper(zkServers, zkTimeout);
+ }
+
+ /**
+ * Register as an available bookie
+ */
+ private void registerBookie(int port) throws IOException {
+ if (null == zk) {
+ // zookeeper instance is null, means not register itself to zk
+ return;
+ }
// Create the ZK ephemeral node for this Bookie.
try {
zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
@@ -853,6 +873,8 @@ public class Bookie extends Thread {
}
// Shutdown the EntryLogger which has the GarbageCollector Thread running
entryLogger.shutdown();
+ // close Ledger Manager
+ ledgerManager.close();
// setting running to false here, so watch thread in bookie server know it only after bookie shut down
running = false;
}
@@ -983,8 +1005,8 @@ public class Bookie extends Thread {
* @throws IOException
* @throws InterruptedException
*/
- public static void main(String[] args) throws IOException,
- InterruptedException, BookieException {
+ public static void main(String[] args)
+ throws IOException, InterruptedException, BookieException, KeeperException {
Bookie b = new Bookie(new ServerConfiguration());
CounterCallback cb = new CounterCallback();
long start = System.currentTimeMillis();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Mon Nov 28 18:25:18 2011
@@ -35,7 +35,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -43,9 +42,7 @@ import java.util.concurrent.ConcurrentMa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
+import org.apache.bookkeeper.meta.LedgerManager;
/**
* This class manages the writing of the bookkeeper entries. All the new
@@ -77,11 +74,6 @@ public class EntryLogger {
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
- // ZK ledgers related String constants
- static final String LEDGERS_PATH = "/ledgers";
- static final String LEDGER_NODE_PREFIX = "L";
- static final String AVAILABLE_NODE = "available";
-
// Maps entry log files to the set of ledgers that comprise the file.
private ConcurrentMap<Long, ConcurrentHashMap<Long, Boolean>> entryLogs2LedgersMap = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>>();
// This is the thread that garbage collects the entry logs that do not
@@ -146,105 +138,73 @@ public class EntryLogger {
}
}
// Initialization check. No need to run any logic if we are still starting up.
- if (bookie.zk == null || entryLogs2LedgersMap.isEmpty() || bookie.ledgerCache == null
- || bookie.ledgerCache.activeLedgers == null) {
+ if (bookie.zk == null || entryLogs2LedgersMap.isEmpty() ||
+ bookie.ledgerCache == null) {
continue;
}
- // First sync ZK to make sure we're reading the latest active/available ledger nodes.
- bookie.zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
- .create(KeeperException.Code.get(rc), path));
- return;
- }
- // Sync has completed successfully so now we can poll ZK
- // and read in the latest set of active ledger nodes.
- List<String> ledgerNodes;
+
+ // gc inactive/deleted ledgers
+ doGcLedgers();
+
+ // gc entry logs
+ doGcEntryLogs();
+ }
+ }
+
+ /**
+ * Do garbage collection ledger index files
+ */
+ private void doGcLedgers() {
+ bookie.ledgerCache.activeLedgerManager.garbageCollectLedgers(
+ new LedgerManager.GarbageCollector() {
+ @Override
+ public void gc(long ledgerId) {
+ try {
+ bookie.ledgerCache.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Garbage collect those entry loggers which are not associated with any active ledgers
+ */
+ private void doGcEntryLogs() {
+ // Loop through all of the entry logs and remove the non-active ledgers.
+ for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
+ ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
+ for (Long entryLogLedger : entryLogLedgers.keySet()) {
+ // Remove the entry log ledger from the set if it isn't active.
+ if (!bookie.ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+ entryLogLedgers.remove(entryLogLedger);
+ }
+ }
+ if (entryLogLedgers.isEmpty()) {
+ // This means the entry log is not associated with any active ledgers anymore.
+ // We can remove this entry log file now.
+ LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
+ BufferedChannel bc = channels.remove(entryLogId);
+ if (null != bc) {
+ // close its underlying file channel, so it could be deleted really
try {
- ledgerNodes = bookie.zk.getChildren(LEDGERS_PATH, null);
- } catch (Exception e) {
- LOG.error("Error polling ZK for the available ledger nodes: ", e);
- // We should probably wait a certain amount of time before retrying in case of temporary issues.
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieved current set of ledger nodes: " + ledgerNodes);
- }
- // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
- HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
- for (String ledgerNode : ledgerNodes) {
- try {
- // The available node is also stored in this path so ignore that.
- // That node is the path for the set of available Bookie Servers.
- if (ledgerNode.equals(AVAILABLE_NODE))
- continue;
- String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
- allActiveLedgers.add(Long.parseLong(parts[parts.length - 1]));
- } catch (NumberFormatException e) {
- LOG.error("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
- // This is a pretty bad error as it indicates a ledger node in ZK
- // has an incorrect format. For now just continue and consider
- // this as a non-existent ledger.
- continue;
- }
- }
- ConcurrentMap<Long, Boolean> curActiveLedgers = bookie.ledgerCache.activeLedgers;
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK: " + allActiveLedgers);
- LOG.debug("Current active ledgers from Bookie: " + curActiveLedgers.keySet());
- }
- // Remove any active ledgers that don't exist in ZK.
- for (Long ledger : curActiveLedgers.keySet()) {
- if (!allActiveLedgers.contains(ledger)) {
- // Remove it from the current active ledgers set and also from all
- // LedgerCache data references to the ledger, i.e. the physical ledger index file.
- LOG.info("Removing a non-active/deleted ledger: " + ledger);
- curActiveLedgers.remove(ledger);
- try {
- bookie.ledgerCache.deleteLedger(ledger);
- } catch (IOException e) {
- LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
- }
- }
+ bc.getFileChannel().close();
+ } catch (IOException ie) {
+ LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
}
- // Loop through all of the entry logs and remove the non-active ledgers.
- for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
- ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
- for (Long entryLogLedger : entryLogLedgers.keySet()) {
- // Remove the entry log ledger from the set if it isn't active.
- if (!bookie.ledgerCache.activeLedgers.containsKey(entryLogLedger)) {
- entryLogLedgers.remove(entryLogLedger);
- }
- }
- if (entryLogLedgers.isEmpty()) {
- // This means the entry log is not associated with any active ledgers anymore.
- // We can remove this entry log file now.
- LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
- BufferedChannel bc = channels.remove(entryLogId);
- if (null != bc) {
- // close its underlying file channel, so it could be deleted really
- try {
- bc.getFileChannel().close();
- } catch (IOException ie) {
- LOG.warn("Exception while closing garbage colected entryLog file : ", ie);
- }
- }
- File entryLogFile;
- try {
- entryLogFile = findFile(entryLogId);
- } catch (FileNotFoundException e) {
- LOG.error("Trying to delete an entryLog file that could not be found: "
- + entryLogId + ".log");
- continue;
- }
- entryLogFile.delete();
- entryLogs2LedgersMap.remove(entryLogId);
- }
- }
- };
- }, null);
+ }
+ File entryLogFile;
+ try {
+ entryLogFile = findFile(entryLogId);
+ } catch (FileNotFoundException e) {
+ LOG.error("Trying to delete an entryLog file that could not be found: "
+ + entryLogId + ".log");
+ continue;
+ }
+ entryLogFile.delete();
+ entryLogs2LedgersMap.remove(entryLogId);
+ }
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Mon Nov 28 18:25:18 2011
@@ -33,11 +33,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.conf.ServerConfiguration;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +49,7 @@ public class LedgerCache {
final File ledgerDirectories[];
- public LedgerCache(ServerConfiguration conf) {
+ public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
this.ledgerDirectories = conf.getLedgerDirs();
this.openFileLimit = conf.getOpenFileLimit();
this.pageSize = conf.getPageSize();
@@ -65,6 +63,7 @@ public class LedgerCache {
}
LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
+ activeLedgerManager = alm;
// Retrieve all of the active ledgers.
getActiveLedgers();
}
@@ -82,8 +81,9 @@ public class LedgerCache {
LinkedList<Long> openLedgers = new LinkedList<Long>();
- // Stores the set of active (non-deleted) ledgers.
- ConcurrentMap<Long, Boolean> activeLedgers = new ConcurrentHashMap<Long, Boolean>();
+ // Manage all active ledgers in LedgerManager
+ // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
+ final LedgerManager activeLedgerManager;
final int openFileLimit;
final int pageSize;
@@ -237,7 +237,7 @@ public class LedgerCache {
if (LOG.isDebugEnabled()) {
LOG.debug("New ledger index file created for ledgerId: " + ledger);
}
- activeLedgers.put(ledger, true);
+ activeLedgerManager.addActiveLedger(ledger, true);
}
if (openLedgers.size() > openFileLimit) {
fileInfoCache.remove(openLedgers.removeFirst()).close();
@@ -505,16 +505,13 @@ public class LedgerCache {
// We've found a ledger index file. The file name is the
// HexString representation of the ledgerId.
String ledgerIdInHex = index.getName().substring(0, index.getName().length() - 4);
- activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
+ activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
}
}
}
}
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Active ledgers found: " + activeLedgers);
- }
}
/**
@@ -529,8 +526,8 @@ public class LedgerCache {
fi.getFile().delete();
fi.close();
- // Remove it from the activeLedgers set
- activeLedgers.remove(ledgerId);
+ // Remove it from the active ledger manager
+ activeLedgerManager.removeActiveLedger(ledgerId);
// Now remove it from all the other lists and maps.
// These data structures need to be synchronized first before removing entries.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Mon Nov 28 18:25:18 2011
@@ -23,16 +23,15 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.util.concurrent.Executors;
-import java.util.EnumSet;
-import java.util.Set;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,9 +77,11 @@ public class BookKeeper {
OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
.getRuntime().availableProcessors());
+ // Ledger manager responsible for how to store ledger meta data
+ final LedgerManager ledgerManager;
+
ClientConfiguration conf;
-
/**
* Create a bookkeeper client. A zookeeper client and a client socket factory
* will be instantiated as part of this constructor.
@@ -137,10 +138,12 @@ public class BookKeeper {
* @param zk
* Zookeeper client instance connected to the zookeeper with which
* the bookies have registered
+ * @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ClientConfiguration conf, ZooKeeper zk) throws InterruptedException, KeeperException {
+ public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
+ throws IOException, InterruptedException, KeeperException {
this(conf, zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
ownChannelFactory = true;
@@ -158,11 +161,12 @@ public class BookKeeper {
* the bookies have registered
* @param channelFactory
* A factory that will be used to create connections to the bookies
+ * @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
- throws InterruptedException, KeeperException {
+ throws IOException, InterruptedException, KeeperException {
if (zk == null || channelFactory == null) {
throw new NullPointerException();
}
@@ -172,6 +176,12 @@ public class BookKeeper {
bookieWatcher = new BookieWatcher(this);
bookieWatcher.readBookiesBlocking();
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
+ // intialize ledger meta manager
+ ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+ }
+
+ LedgerManager getLedgerManager() {
+ return ledgerManager;
}
/**
@@ -463,6 +473,7 @@ public class BookKeeper {
*/
public void close() throws InterruptedException, BKException {
bookieClient.close();
+ ledgerManager.close();
bookieWatcher.halt();
if (ownChannelFactory) {
channelFactory.releaseExternalResources();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Mon Nov 28 18:25:18 2011
@@ -21,9 +21,6 @@ package org.apache.bookkeeper.client;
*
*/
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -33,14 +30,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.slf4j.Logger;
@@ -65,10 +62,7 @@ public class BookKeeperAdmin {
// ZK client instance
private ZooKeeper zk;
// ZK ledgers related String constants
- static final String LEDGERS_PATH = "/ledgers";
- static final String LEDGER_NODE_PREFIX = "L";
- static final String AVAILABLE_NODE = "available";
- static final String BOOKIES_PATH = LEDGERS_PATH + "/" + AVAILABLE_NODE;
+ static final String BOOKIES_PATH = BookieWatcher.BOOKIE_REGISTRATION_PATH;
// BookKeeper client instance
private BookKeeper bkc;
@@ -161,52 +155,6 @@ public class BookKeeperAdmin {
}
/**
- * This is a multi callback object for bookie recovery that waits for all of
- * the multiple async operations to complete. If any fail, then we invoke
- * the final callback with a BK LedgerRecoveryException.
- */
- class MultiCallback implements AsyncCallback.VoidCallback {
- // Number of expected callbacks
- final int expected;
- // Final callback and the corresponding context to invoke
- final AsyncCallback.VoidCallback cb;
- final Object context;
- // This keeps track of how many operations have completed
- final AtomicInteger done = new AtomicInteger();
- // List of the exceptions from operations that completed unsuccessfully
- final LinkedBlockingQueue<Integer> exceptions = new LinkedBlockingQueue<Integer>();
-
- MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) {
- this.expected = expected;
- this.cb = cb;
- this.context = context;
- if (expected == 0) {
- cb.processResult(Code.OK.intValue(), null, context);
- }
- }
-
- private void tick() {
- if (done.incrementAndGet() == expected) {
- if (exceptions.isEmpty()) {
- cb.processResult(Code.OK.intValue(), null, context);
- } else {
- cb.processResult(BKException.Code.LedgerRecoveryException, null, context);
- }
- }
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("BK error recovering ledger data", BKException.create(rc));
- exceptions.add(rc);
- }
- tick();
- }
-
- }
-
- /**
* Method to get the input ledger's digest type. For now, this is just a
* placeholder function since there is no way we can get this information
* easily. In the future, BookKeeper should store this ledger metadata
@@ -316,8 +264,8 @@ public class BookKeeperAdmin {
*/
public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
final RecoverCallback cb, final Object context) {
- // Sync ZK to make sure we're reading the latest bookie/ledger data.
- zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
+ // Sync ZK to make sure we're reading the latest bookie data.
+ zk.sync(BOOKIES_PATH, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (rc != Code.OK.intValue()) {
@@ -407,36 +355,30 @@ public class BookKeeperAdmin {
*/
private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
final RecoverCallback cb, final Object context, final List<InetSocketAddress> availableBookies) {
- zk.getChildren(LEDGERS_PATH, null, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error getting ledger nodes: ", KeeperException.create(KeeperException.Code.get(rc),
- path));
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- // Wrapper class around the RecoverCallback so it can be used
- // as the final VoidCallback to invoke within the MultiCallback.
- class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
- final RecoverCallback cb;
+ // Wrapper class around the RecoverCallback so it can be used
+ // as the final VoidCallback to process ledgers
+ class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
+ final RecoverCallback cb;
- RecoverCallbackWrapper(RecoverCallback cb) {
- this.cb = cb;
- }
+ RecoverCallbackWrapper(RecoverCallback cb) {
+ this.cb = cb;
+ }
- @Override
- public void processResult(int rc, String path, Object ctx) {
- cb.recoverComplete(rc, ctx);
- }
- }
- // Recover each of the ledgers asynchronously
- MultiCallback ledgerMcb = new MultiCallback(children.size(), new RecoverCallbackWrapper(cb), context);
- for (final String ledgerNode : children) {
- recoverLedger(bookieSrc, ledgerNode, ledgerMcb, availableBookies);
- }
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ cb.recoverComplete(rc, ctx);
}
- }, null);
+ }
+
+ Processor<Long> ledgerProcessor = new Processor<Long>() {
+ @Override
+ public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
+ recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
+ }
+ };
+ bkc.getLedgerManager().asyncProcessLedgers(
+ ledgerProcessor, new RecoverCallbackWrapper(cb),
+ context, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
}
/**
@@ -462,11 +404,10 @@ public class BookKeeperAdmin {
* @param bookieSrc
* Source bookie that had a failure. We want to replicate the
* ledger fragments that were stored there.
- * @param ledgerNode
- * Ledger Node name as retrieved from ZooKeeper we want to
- * recover.
- * @param ledgerMcb
- * MultiCallback to invoke once we've recovered the current
+ * @param lId
+ * Ledger id we want to recover.
+ * @param ledgerIterCb
+ * IterationCallback to invoke once we've recovered the current
* ledger.
* @param availableBookies
* List of Bookie Servers that are available to use for
@@ -474,30 +415,10 @@ public class BookKeeperAdmin {
* single bookie server if the user explicitly chose a bookie
* server to replicate data to.
*/
- private void recoverLedger(final InetSocketAddress bookieSrc, final String ledgerNode,
- final MultiCallback ledgerMcb, final List<InetSocketAddress> availableBookies) {
- /*
- * The available node is also stored in this path so ignore that. That
- * node is the path for the set of available Bookie Servers.
- */
- if (ledgerNode.equals(AVAILABLE_NODE)) {
- ledgerMcb.processResult(BKException.Code.OK, null, null);
- return;
- }
- // Parse out the ledgerId from the ZK ledger node.
- String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
- if (parts.length < 2) {
- LOG.error("Ledger Node retrieved from ZK has invalid name format: " + ledgerNode);
- ledgerMcb.processResult(BKException.Code.ZKException, null, null);
- return;
- }
- final long lId;
- try {
- lId = Long.parseLong(parts[parts.length - 1]);
- } catch (NumberFormatException e) {
- LOG.error("Error retrieving ledgerId from ledgerNode: " + ledgerNode, e);
- ledgerMcb.processResult(BKException.Code.ZKException, null, null);
- return;
+ private void recoverLedger(final InetSocketAddress bookieSrc, final long lId,
+ final AsyncCallback.VoidCallback ledgerIterCb, final List<InetSocketAddress> availableBookies) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering ledger : " + lId);
}
/*
* For the current ledger, open it to retrieve the LedgerHandle. This
@@ -512,7 +433,7 @@ public class BookKeeperAdmin {
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
if (rc != Code.OK.intValue()) {
LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
- ledgerMcb.processResult(rc, null, null);
+ ledgerIterCb.processResult(rc, null, null);
return;
}
/*
@@ -550,17 +471,17 @@ public class BookKeeperAdmin {
* multiCallback and return.
*/
if (ledgerFragmentsToRecover.size() == 0) {
- ledgerMcb.processResult(BKException.Code.OK, null, null);
+ ledgerIterCb.processResult(BKException.Code.OK, null, null);
return;
}
/*
* Multicallback for ledger. Once all fragments for the ledger have been recovered
- * trigger the ledgerMcb
+ * trigger the ledgerIterCb
*/
- MultiCallback ledgerFragmentsMcb
- = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerMcb, null);
-
+ MultiCallback ledgerFragmentsMcb
+ = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerIterCb, null,
+ BKException.Code.OK, BKException.Code.LedgerRecoveryException);
/*
* Now recover all of the necessary ledger fragments
* asynchronously using a MultiCallback for every fragment.
@@ -657,7 +578,9 @@ public class BookKeeperAdmin {
* Now asynchronously replicate all of the entries for the ledger
* fragment that were on the dead bookie.
*/
- MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), cb, null);
+ MultiCallback ledgerFragmentEntryMcb =
+ new MultiCallback(entriesToReplicate.size(), cb, null,
+ BKException.Code.OK, BKException.Code.LedgerRecoveryException);
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Mon Nov 28 18:25:18 2011
@@ -28,21 +28,18 @@ import java.util.ArrayList;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* Encapsulates asynchronous ledger create operation
*
*/
-class LedgerCreateOp implements StringCallback, StatCallback {
+class LedgerCreateOp implements GenericCallback<String>, StatCallback {
static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
@@ -86,27 +83,18 @@ class LedgerCreateOp implements StringCa
* Initiates the operation
*/
public void initiate() {
- /*
- * Create ledger node on ZK. We get the id from the sequence number on
- * the node.
- */
-
- bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL, this, null);
-
- // calls the children callback method below
+ bk.getLedgerManager().newLedgerPath(this);
}
-
/**
- * Implements ZooKeeper string callback.
- *
- * @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
+ * Callback when created ledger path.
*/
- public void processResult(int rc, String path, Object ctx, String name) {
+ @Override
+ public void operationComplete(int rc, String ledgerPath) {
if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not create node for ledger", KeeperException.create(KeeperException.Code.get(rc), path));
+ LOG.error("Could not create node for ledger",
+ KeeperException.create(KeeperException.Code.get(rc), ledgerPath));
cb.createComplete(BKException.Code.ZKException, null, this.ctx);
return;
}
@@ -116,9 +104,9 @@ class LedgerCreateOp implements StringCa
*/
long ledgerId;
try {
- ledgerId = StringUtils.getLedgerId(name);
+ ledgerId = bk.getLedgerManager().getLedgerId(ledgerPath);
} catch (IOException e) {
- LOG.error("Could not extract ledger-id from path:" + path, e);
+ LOG.error("Could not extract ledger-id from path:" + ledgerPath, e);
cb.createComplete(BKException.Code.ZKException, null, this.ctx);
return;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Nov 28 18:25:18 2011
@@ -22,7 +22,6 @@
package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -65,7 +64,8 @@ class LedgerDeleteOp implements VoidCall
public void initiate() {
// Asynchronously delete the ledger node in ZK.
// When this completes, it will invoke the callback method below.
- bk.getZkHandle().delete(StringUtils.getLedgerNodePath(ledgerId), -1, this, null);
+
+ bk.getZkHandle().delete(bk.getLedgerManager().getLedgerPath(ledgerId), -1, this, null);
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Mon Nov 28 18:25:18 2011
@@ -29,7 +29,6 @@ import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.Semaphore;
-import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -40,7 +39,6 @@ import org.apache.bookkeeper.client.Book
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -196,9 +194,9 @@ public class LedgerHandle {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing metadata to ZooKeeper: " + this.ledgerId + ", " + metadata.getZnodeVersion());
}
-
- bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
- metadata.serialize(), metadata.getZnodeVersion(),
+
+ bk.getZkHandle().setData(bk.getLedgerManager().getLedgerPath(ledgerId),
+ metadata.serialize(), metadata.getZnodeVersion(),
callback, ctx);
}
@@ -609,11 +607,11 @@ public class LedgerHandle {
}, null);
}
-
+
void rereadMetadata(final GenericCallback<Void> cb) {
- bk.getZkHandle().getData(StringUtils.getLedgerNodePath(ledgerId), false,
+ bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
new DataCallback() {
- public void processResult(int rc, String path,
+ public void processResult(int rc, String path,
Object ctx, byte[] data, Stat stat) {
if (rc != KeeperException.Code.OK.intValue()) {
LOG.error("Error reading metadata from ledger, code =" + rc);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Mon Nov 28 18:25:18 2011
@@ -26,7 +26,6 @@ import java.security.GeneralSecurityExce
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
@@ -80,7 +79,7 @@ class LedgerOpenOp implements DataCallba
* Asynchronously read the ledger metadata node.
*/
- bk.getZkHandle().getData(StringUtils.getLedgerNodePath(ledgerId), false, this, ctx);
+ bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false, this, ctx);
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Mon Nov 28 18:25:18 2011
@@ -25,10 +25,16 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+
/**
* Abstract configuration
*/
-public abstract class AbstractConfiguration extends CompositeConfiguration {
+public abstract class AbstractConfiguration extends CompositeConfiguration {
+
+ // Ledger Manager
+ protected final static String LEDGER_MANAGER_TYPE = "ledgerManagerType";
+ protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
protected AbstractConfiguration() {
super();
@@ -58,4 +64,42 @@ public abstract class AbstractConfigurat
addConfiguration(baseConf);
}
+ /**
+ * Set Ledger Manager Type.
+ *
+ * @param lmType
+ * Ledger Manager Type
+ * @return void
+ */
+ public void setLedgerManagerType(String lmType) {
+ setProperty(LEDGER_MANAGER_TYPE, lmType);
+ }
+
+ /**
+ * Get Ledger Manager Type.
+ *
+ * @return ledger manager type
+ * @throws ConfigurationException
+ */
+ public String getLedgerManagerType() {
+ return getString(LEDGER_MANAGER_TYPE);
+ }
+
+ /**
+ * Set Zk Ledgers Root Path.
+ *
+ * @param zkLedgersPath zk ledgers root path
+ */
+ public void setZkLedgersRootPath(String zkLedgersPath) {
+ setProperty(ZK_LEDGERS_ROOT_PATH, zkLedgersPath);
+ }
+
+ /**
+ * Get Zk Ledgers Root Path.
+ *
+ * @return zk ledgers root path
+ */
+ public String getZkLedgersRootPath() {
+ return getString(ZK_LEDGERS_ROOT_PATH, "/ledgers");
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1207495&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Mon Nov 28 18:25:18 2011
@@ -0,0 +1,251 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
+ */
+abstract class AbstractZkLedgerManager implements LedgerManager {
+
+ static Logger LOG = Logger.getLogger(AbstractZkLedgerManager.class);
+
+ // Ledger Node Prefix
+ static public final String LEDGER_NODE_PREFIX = "L";
+ static final String AVAILABLE_NODE = "available";
+
+ protected final AbstractConfiguration conf;
+ protected final ZooKeeper zk;
+ protected final String ledgerRootPath;
+
+ /**
+ * ZooKeeper-based Ledger Manager Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ * @param ledgerRootPath
+ * ZooKeeper Path to store ledger metadata
+ */
+ protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+ String ledgerRootPath) {
+ this.conf = conf;
+ this.zk = zk;
+ this.ledgerRootPath = ledgerRootPath;
+ }
+
+ /**
+ * Get all the ledgers in a single zk node
+ *
+ * @param nodePath
+ * Zookeeper node path
+ * @param getLedgersCallback
+ * callback function to process ledgers in a single node
+ */
+ protected void asyncGetLedgersInSingleNode(final String nodePath, final GenericCallback<HashSet<Long>> getLedgersCallback) {
+ // First sync ZK to make sure we're reading the latest active/available ledger nodes.
+ zk.sync(nodePath, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sync node path " + path + " return : " + rc);
+ }
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
+ .create(KeeperException.Code.get(rc), path));
+ getLedgersCallback.operationComplete(rc, null);
+ return;
+ }
+ // Sync has completed successfully so now we can poll ZK
+ // and read in the latest set of active ledger nodes.
+ doAsyncGetLedgersInSingleNode(nodePath, getLedgersCallback);
+ }
+ }, null);
+ }
+
+ private void doAsyncGetLedgersInSingleNode(final String nodePath,
+ final GenericCallback<HashSet<Long>> getLedgersCallback) {
+ zk.getChildren(nodePath, false, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String> ledgerNodes) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error polling ZK for the available ledger nodes: ", KeeperException
+ .create(KeeperException.Code.get(rc), path));
+ getLedgersCallback.operationComplete(rc, null);
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved current set of ledger nodes: " + ledgerNodes);
+ }
+ // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
+ HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
+ for (String ledgerNode : ledgerNodes) {
+ if (isSpecialZnode(ledgerNode)) {
+ continue;
+ }
+ try {
+ // convert the node path to ledger id according to different ledger manager implementation
+ allActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
+ } catch (IOException ie) {
+ LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
+ // This is a pretty bad error as it indicates a ledger node in ZK
+ // has an incorrect format. For now just continue and consider
+ // this as a non-existent ledger.
+ continue;
+ }
+ }
+
+ getLedgersCallback.operationComplete(rc, allActiveLedgers);
+
+ }
+ }, null);
+ }
+
+ private class GetLedgersCtx {
+ int rc;
+ HashSet<Long> ledgers = null;
+ }
+
+ /**
+ * Get all the ledgers in a single zk node
+ *
+ * @param nodePath
+ * Zookeeper node path
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected HashSet<Long> getLedgersInSingleNode(final String nodePath)
+ throws IOException, InterruptedException {
+ final GetLedgersCtx ctx = new GetLedgersCtx();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to get ledgers of node : " + nodePath);
+ }
+ synchronized (ctx) {
+ asyncGetLedgersInSingleNode(nodePath, new GenericCallback<HashSet<Long>>() {
+ @Override
+ public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+ synchronized (ctx) {
+ if (Code.OK.intValue() == rc) {
+ ctx.ledgers = zkActiveLedgers;
+ }
+ ctx.rc = rc;
+ ctx.notifyAll();
+ }
+ }
+ });
+ ctx.wait();
+ }
+ if (Code.OK.intValue() != ctx.rc && null != ctx.ledgers) {
+ throw new IOException("Error on getting ledgers from node " + nodePath);
+ }
+ return ctx.ledgers;
+ }
+
+ /**
+ * Process ledgers in a single zk node.
+ *
+ * <p>
+ * for each ledger found in this zk node, processor#process(ledgerId) will be triggerred
+ * to process a specific ledger. after all ledgers has been processed, the finalCb will
+ * be called with provided context object. The RC passed to finalCb is decided by :
+ * <ul>
+ * <li> All ledgers are processed successfully, successRc will be passed.
+ * <li> Either ledger is processed failed, failureRc will be passed.
+ * </ul>
+ * </p>
+ *
+ * @param path
+ * Zk node path to store ledgers
+ * @param processor
+ * Processor provided to process ledger
+ * @param finalCb
+ * Callback object when all ledgers are processed
+ * @param ctx
+ * Context object passed to finalCb
+ * @param successRc
+ * RC passed to finalCb when all ledgers are processed successfully
+ * @param failureRc
+ * RC passed to finalCb when either ledger is processed failed
+ */
+ protected void asyncProcessLedgersInSingleNode(
+ final String path, final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object ctx,
+ final int successRc, final int failureRc) {
+ asyncGetLedgersInSingleNode(path, new GenericCallback<HashSet<Long>>() {
+ @Override
+ public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+ if (Code.OK.intValue() != rc) {
+ finalCb.processResult(failureRc, null, ctx);
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing ledgers : " + zkActiveLedgers);
+ }
+
+ // no ledgers found, return directly
+ if (zkActiveLedgers.size() == 0) {
+ finalCb.processResult(successRc, null, ctx);
+ return;
+ }
+
+ MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(), finalCb, ctx,
+ successRc, failureRc);
+ // start loop over all ledgers
+ for (Long ledger : zkActiveLedgers) {
+ processor.process(ledger, mcb);
+ }
+ }
+ });
+ }
+
+ /**
+ * Whether the znode a special znode
+ *
+ * @param znode
+ * Znode Name
+ * @return true if the znode is a special znode otherwise false
+ */
+ protected boolean isSpecialZnode(String znode) {
+ if (AVAILABLE_NODE.equals(znode)
+ || LedgerLayout.LAYOUT_ZNODE.equals(znode)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1207495&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Mon Nov 28 18:25:18 2011
@@ -0,0 +1,193 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashSet;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage all ledgers in a single zk node.
+ *
+ * <p>
+ * All ledgers' metadata are put in a single zk node, created using zk sequential node.
+ * Each ledger node is prefixed with 'L'.
+ * </p>
+ * <p>
+ * All actived ledgers found in bookie server side is managed in a hash map.
+ * </p>
+ * <p>
+ * Garbage collection in FlatLedgerManager is procssed as below:
+ * <ul>
+ * <li> fetch all existed ledgers from zookeeper, said <b>zkActiveLedgers</b>
+ * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
+ * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in
+ * <b>zkActiveLedgers</b>, do garbage collection on them.
+ * </ul>
+ * </p>
+ */
+class FlatLedgerManager extends AbstractZkLedgerManager {
+
+ static final Logger LOG = LoggerFactory.getLogger(FlatLedgerManager.class);
+ public static final String NAME = "flat";
+ public static final int CUR_VERSION = 1;
+
+ // path prefix to store ledger znodes
+ private final String ledgerPrefix;
+ // hash map to store all active ledger ids
+ private ConcurrentMap<Long, Boolean> activeLedgers;
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ * @param ledgerRootPath
+ * ZooKeeper Path to store ledger metadata
+ * @throws IOException when version is not compatible
+ */
+ public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+ String ledgerRootPath, int layoutVersion)
+ throws IOException {
+ super(conf, zk, ledgerRootPath);
+
+ if (layoutVersion != CUR_VERSION) {
+ throw new IOException("Incompatible layout version found : "
+ + layoutVersion);
+ }
+
+ ledgerPrefix = ledgerRootPath + "/" + LEDGER_NODE_PREFIX;
+ activeLedgers = new ConcurrentHashMap<Long, Boolean>();
+ }
+
+ @Override
+ public void newLedgerPath(final GenericCallback<String> cb) {
+ StringCallback scb = new StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ String name) {
+ if (Code.OK.intValue() != rc) {
+ cb.operationComplete(rc, null);
+ } else {
+ cb.operationComplete(rc, name);
+ }
+ }
+ };
+ ZkUtils.createFullPathOptimistic(zk, ledgerPrefix, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null);
+ }
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ledgerPrefix)
+ .append(StringUtils.getZKStringId(ledgerId));
+ return sb.toString();
+ }
+
+ @Override
+ public long getLedgerId(String nodeName) throws IOException {
+ long ledgerId;
+ try {
+ String parts[] = nodeName.split(ledgerPrefix);
+ ledgerId = Long.parseLong(parts[parts.length - 1]);
+ } catch (NumberFormatException e) {
+ throw new IOException(e);
+ }
+ return ledgerId;
+ }
+
+ @Override
+ public void asyncProcessLedgers(final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object ctx,
+ final int successRc, final int failureRc) {
+ asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
+ }
+
+ @Override
+ public void addActiveLedger(long ledgerId, boolean active) {
+ activeLedgers.put(ledgerId, active);
+ }
+
+ @Override
+ public void removeActiveLedger(long ledgerId) {
+ activeLedgers.remove(ledgerId);
+ }
+
+ @Override
+ public boolean containsActiveLedger(long ledgerId) {
+ return activeLedgers.containsKey(ledgerId);
+ }
+
+ @Override
+ public void garbageCollectLedgers(GarbageCollector gc) {
+ try {
+ HashSet<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
+ ConcurrentMap<Long, Boolean> bkActiveLedgers = activeLedgers;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK: " + zkActiveLedgers);
+ LOG.debug("Current active ledgers from Bookie: " + bkActiveLedgers.keySet());
+ }
+ doGc(gc, bkActiveLedgers, zkActiveLedgers);
+ } catch (IOException ie) {
+ LOG.warn("Error during garbage collecting ledgers from " + ledgerRootPath, ie);
+ } catch (InterruptedException inte) {
+ LOG.warn("Interrupted during garbage collecting ledgers from " + ledgerRootPath, inte);
+ }
+ }
+
+ /**
+ * Do garbage collecting comparing hosted ledgers and zk ledgers
+ *
+ * @param gc
+ * Garbage collector to do garbage collection when found inactive/deleted ledgers
+ * @param bkActiveLedgers
+ * Active ledgers hosted in bookie server
+ * @param zkAllLedgers
+ * All ledgers stored in zookeeper
+ */
+ void doGc(GarbageCollector gc, ConcurrentMap<Long, Boolean> bkActiveLedgers, HashSet<Long> zkAllLedgers) {
+ // remove any active ledgers that doesn't exist in zk
+ for (Long bkLid : bkActiveLedgers.keySet()) {
+ if (!zkAllLedgers.contains(bkLid)) {
+ // remove it from current active ledger
+ bkActiveLedgers.remove(bkLid);
+ gc.gc(bkLid);
+ }
+ }
+ }
+
+}