You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC
svn commit: r1165369 [2/9] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.client;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.IOException;
@@ -41,429 +41,429 @@ import org.jboss.netty.channel.socket.ni
/**
* BookKeeper client. We assume there is one single writer to a ledger at any
* time.
- *
+ *
* There are four possible operations: start a new ledger, write to a ledger,
* read from a ledger and delete a ledger.
- *
+ *
* The exceptions resulting from synchronous calls and error code resulting from
* asynchronous calls can be found in the class {@link BKException}.
- *
- *
+ *
+ *
*/
public class BookKeeper implements OpenCallback, CreateCallback, DeleteCallback {
- static final Logger LOG = Logger.getLogger(BookKeeper.class);
+ static final Logger LOG = Logger.getLogger(BookKeeper.class);
- ZooKeeper zk = null;
- // whether the zk handle is one we created, or is owned by whoever
- // instantiated us
- boolean ownZKHandle = false;
-
- ClientSocketChannelFactory channelFactory;
- // whether the socket factory is one we created, or is owned by whoever
- // instantiated us
- boolean ownChannelFactory = false;
-
- BookieClient bookieClient;
- BookieWatcher bookieWatcher;
-
- OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
- .getRuntime().availableProcessors());
- OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
- .getRuntime().availableProcessors());
-
- /**
- * Create a bookkeeper client. A zookeeper client and a client socket factory
- * will be instantiated as part of this constructor.
- *
- * @param servers
- * A list of one of more servers on which zookeeper is running. The
- * client assumes that the running bookies have been registered with
- * zookeeper under the path
- * {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- public BookKeeper(String servers) throws IOException, InterruptedException,
- KeeperException {
- this(new ZooKeeper(servers, 10000, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // TODO: handle session disconnects and expires
- if (LOG.isDebugEnabled()) {
- LOG.debug("Process: " + event.getType() + " " + event.getPath());
+ ZooKeeper zk = null;
+ // whether the zk handle is one we created, or is owned by whoever
+ // instantiated us
+ boolean ownZKHandle = false;
+
+ ClientSocketChannelFactory channelFactory;
+ // whether the socket factory is one we created, or is owned by whoever
+ // instantiated us
+ boolean ownChannelFactory = false;
+
+ BookieClient bookieClient;
+ BookieWatcher bookieWatcher;
+
+ OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
+ .getRuntime().availableProcessors());
+ OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+ .getRuntime().availableProcessors());
+
+ /**
+ * Create a bookkeeper client. A zookeeper client and a client socket factory
+ * will be instantiated as part of this constructor.
+ *
+ * @param servers
+ * A list of one of more servers on which zookeeper is running. The
+ * client assumes that the running bookies have been registered with
+ * zookeeper under the path
+ * {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(String servers) throws IOException, InterruptedException,
+ KeeperException {
+ this(new ZooKeeper(servers, 10000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO: handle session disconnects and expires
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process: " + event.getType() + " " + event.getPath());
+ }
+ }
+ }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+
+ ownZKHandle = true;
+ ownChannelFactory = true;
+ }
+
+ /**
+ * Create a bookkeeper client but use the passed in zookeeper client instead
+ * of instantiating one.
+ *
+ * @param zk
+ * Zookeeper client instance connected to the zookeeper with which
+ * the bookies have registered
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
+ this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ ownChannelFactory = true;
+ }
+
+ /**
+ * Create a bookkeeper client but use the passed in zookeeper client and
+ * client socket channel factory instead of instantiating those.
+ *
+ * @param zk
+ * Zookeeper client instance connected to the zookeeper with which
+ * the bookies have registered
+ * @param channelFactory
+ * A factory that will be used to create connections to the bookies
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
+ throws InterruptedException, KeeperException {
+ if (zk == null || channelFactory == null) {
+ throw new NullPointerException();
}
- }
- }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
-
- ownZKHandle = true;
- ownChannelFactory = true;
- }
-
- /**
- * Create a bookkeeper client but use the passed in zookeeper client instead
- * of instantiating one.
- *
- * @param zk
- * Zookeeper client instance connected to the zookeeper with which
- * the bookies have registered
- * @throws InterruptedException
- * @throws KeeperException
- */
- public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
- this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- ownChannelFactory = true;
- }
-
- /**
- * Create a bookkeeper client but use the passed in zookeeper client and
- * client socket channel factory instead of instantiating those.
- *
- * @param zk
- * Zookeeper client instance connected to the zookeeper with which
- * the bookies have registered
- * @param channelFactory
- * A factory that will be used to create connections to the bookies
- * @throws InterruptedException
- * @throws KeeperException
- */
- public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
- throws InterruptedException, KeeperException {
- if (zk == null || channelFactory == null) {
- throw new NullPointerException();
- }
- this.zk = zk;
- this.channelFactory = channelFactory;
- bookieWatcher = new BookieWatcher(this);
- bookieWatcher.readBookiesBlocking();
- bookieClient = new BookieClient(channelFactory, mainWorkerPool);
- }
-
- /**
- * There are 2 digest types that can be used for verification. The CRC32 is
- * cheap to compute but does not protect against byzantine bookies (i.e., a
- * bookie might report fake bytes and a matching CRC32). The MAC code is more
- * expensive to compute, but is protected by a password, i.e., a bookie can't
- * report fake bytes with a mathching MAC unless it knows the password
- */
- public enum DigestType {
- MAC, CRC32
- };
-
- public ZooKeeper getZkHandle() {
- return zk;
- }
-
- /**
- * Get the BookieClient, currently used for doing bookie recovery.
- *
- * @return BookieClient for the BookKeeper instance.
- */
- public BookieClient getBookieClient() {
- return bookieClient;
- }
-
- /**
- * Creates a new ledger asynchronously. To create a ledger, we need to specify
- * the ensemble size, the quorum size, the digest type, a password, a callback
- * implementation, and an optional control object. The ensemble size is how
- * many bookies the entries should be striped among and the quorum size is the
- * degree of replication of each entry. The digest type is either a MAC or a
- * CRC. Note that the CRC option is not able to protect a client against a
- * bookie that replaces an entry. The password is used not only to
- * authenticate access to a ledger, but also to verify entries in ledgers.
- *
- * @param ensSize
- * ensemble size
- * @param qSize
- * quorum size
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @param cb
- * createCallback implementation
- * @param ctx
- * optional control object
- */
- public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
- byte[] passwd, CreateCallback cb, Object ctx) {
+ this.zk = zk;
+ this.channelFactory = channelFactory;
+ bookieWatcher = new BookieWatcher(this);
+ bookieWatcher.readBookiesBlocking();
+ bookieClient = new BookieClient(channelFactory, mainWorkerPool);
+ }
+
+ /**
+ * There are 2 digest types that can be used for verification. The CRC32 is
+ * cheap to compute but does not protect against byzantine bookies (i.e., a
+ * bookie might report fake bytes and a matching CRC32). The MAC code is more
+ * expensive to compute, but is protected by a password, i.e., a bookie can't
+ * report fake bytes with a mathching MAC unless it knows the password
+ */
+ public enum DigestType {
+ MAC, CRC32
+ };
+
+ public ZooKeeper getZkHandle() {
+ return zk;
+ }
+
+ /**
+ * Get the BookieClient, currently used for doing bookie recovery.
+ *
+ * @return BookieClient for the BookKeeper instance.
+ */
+ public BookieClient getBookieClient() {
+ return bookieClient;
+ }
+
+ /**
+ * Creates a new ledger asynchronously. To create a ledger, we need to specify
+ * the ensemble size, the quorum size, the digest type, a password, a callback
+ * implementation, and an optional control object. The ensemble size is how
+ * many bookies the entries should be striped among and the quorum size is the
+ * degree of replication of each entry. The digest type is either a MAC or a
+ * CRC. Note that the CRC option is not able to protect a client against a
+ * bookie that replaces an entry. The password is used not only to
+ * authenticate access to a ledger, but also to verify entries in ledgers.
+ *
+ * @param ensSize
+ * ensemble size
+ * @param qSize
+ * quorum size
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param cb
+ * createCallback implementation
+ * @param ctx
+ * optional control object
+ */
+ public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
+ byte[] passwd, CreateCallback cb, Object ctx) {
- new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
+ new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
.initiate();
- }
+ }
+
+ /**
+ * Create callback implementation for synchronous create call.
+ *
+ * @param rc
+ * return code
+ * @param lh
+ * ledger handle object
+ * @param ctx
+ * optional control object
+ */
+ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setLh(lh);
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+ /**
+ * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
+ *
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+ throws KeeperException, BKException, InterruptedException, IOException {
+ return createLedger(3, 2, digestType, passwd);
+ }
+
+ /**
+ * Synchronous call to create ledger. Parameters match those of
+ * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
+ *
+ * @param ensSize
+ * @param qSize
+ * @param digestType
+ * @param passwd
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws BKException
+ */
+ public LedgerHandle createLedger(int ensSize, int qSize,
+ DigestType digestType, byte passwd[]) throws KeeperException,
+ InterruptedException, IOException, BKException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+ /*
+ * Calls asynchronous version
+ */
+ asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
+
+ /*
+ * Wait
+ */
+ counter.block(0);
+ if (counter.getLh() == null) {
+ LOG.error("ZooKeeper error: " + counter.getrc());
+ throw BKException.create(Code.ZKException);
+ }
+
+ return counter.getLh();
+ }
+
+ /**
+ * Open existing ledger asynchronously for reading.
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ */
+ public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
+ OpenCallback cb, Object ctx) {
+
+ new LedgerOpenOp(this, lId, digestType, passwd, false, cb, ctx).initiate();
+
+ }
+
+ /**
+ * Open existing ledger asynchronously for reading, but it does not try to
+ * recover the ledger if it is not yet closed. The application needs to use
+ * it carefully, since the writer might have crash and ledger will remain
+ * unsealed forever if there is no external mechanism to detect the failure
+ * of the writer and the ledger is not open in a safe manner, invoking the
+ * recovery procedure.
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ */
+
+ public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
+ OpenCallback cb, Object ctx) {
+
+ new LedgerOpenOp(this, lId, digestType, passwd, true, cb, ctx).initiate();
+
+ }
+
+ /**
+ * Callback method for synchronous open operation
+ *
+ * @param rc
+ * return code
+ * @param lh
+ * ledger handle
+ * @param ctx
+ * optional control object
+ */
+ public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setLh(lh);
+
+ LOG.debug("Open complete: " + rc);
+
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+ /**
+ * Synchronous open ledger call
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws InterruptedException
+ * @throws BKException
+ */
+
+ public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
+ throws BKException, InterruptedException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ /*
+ * Calls async open ledger
+ */
+ asyncOpenLedger(lId, digestType, passwd, this, counter);
+
+ /*
+ * Wait
+ */
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK)
+ throw BKException.create(counter.getrc());
+
+ return counter.getLh();
+ }
+
+ /**
+ * Synchronous, unsafe open ledger call
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws InterruptedException
+ * @throws BKException
+ */
- /**
- * Create callback implementation for synchronous create call.
- *
- * @param rc
- * return code
- * @param lh
- * ledger handle object
- * @param ctx
- * optional control object
- */
- public void createComplete(int rc, LedgerHandle lh, Object ctx) {
- SyncCounter counter = (SyncCounter) ctx;
- counter.setLh(lh);
- counter.setrc(rc);
- counter.dec();
- }
-
- /**
- * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
- *
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- * @throws BKException
- */
- public LedgerHandle createLedger(DigestType digestType, byte passwd[])
- throws KeeperException, BKException, InterruptedException, IOException {
- return createLedger(3, 2, digestType, passwd);
- }
-
- /**
- * Synchronous call to create ledger. Parameters match those of
- * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
- *
- * @param ensSize
- * @param qSize
- * @param digestType
- * @param passwd
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws BKException
- */
- public LedgerHandle createLedger(int ensSize, int qSize,
- DigestType digestType, byte passwd[]) throws KeeperException,
- InterruptedException, IOException, BKException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
- /*
- * Calls asynchronous version
- */
- asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
-
- /*
- * Wait
- */
- counter.block(0);
- if (counter.getLh() == null) {
- LOG.error("ZooKeeper error: " + counter.getrc());
- throw BKException.create(Code.ZKException);
- }
-
- return counter.getLh();
- }
-
- /**
- * Open existing ledger asynchronously for reading.
- *
- * @param lId
- * ledger identifier
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @param ctx
- * optional control object
- */
- public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
- OpenCallback cb, Object ctx) {
-
- new LedgerOpenOp(this, lId, digestType, passwd, false, cb, ctx).initiate();
-
- }
-
- /**
- * Open existing ledger asynchronously for reading, but it does not try to
- * recover the ledger if it is not yet closed. The application needs to use
- * it carefully, since the writer might have crash and ledger will remain
- * unsealed forever if there is no external mechanism to detect the failure
- * of the writer and the ledger is not open in a safe manner, invoking the
- * recovery procedure.
- *
- * @param lId
- * ledger identifier
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @param ctx
- * optional control object
- */
-
- public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
- OpenCallback cb, Object ctx) {
-
- new LedgerOpenOp(this, lId, digestType, passwd, true, cb, ctx).initiate();
-
- }
-
- /**
- * Callback method for synchronous open operation
- *
- * @param rc
- * return code
- * @param lh
- * ledger handle
- * @param ctx
- * optional control object
- */
- public void openComplete(int rc, LedgerHandle lh, Object ctx) {
- SyncCounter counter = (SyncCounter) ctx;
- counter.setLh(lh);
-
- LOG.debug("Open complete: " + rc);
-
- counter.setrc(rc);
- counter.dec();
- }
-
- /**
- * Synchronous open ledger call
- *
- * @param lId
- * ledger identifier
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @return
- * @throws InterruptedException
- * @throws BKException
- */
-
- public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
- throws BKException, InterruptedException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
-
- /*
- * Calls async open ledger
- */
- asyncOpenLedger(lId, digestType, passwd, this, counter);
-
- /*
- * Wait
- */
- counter.block(0);
- if (counter.getrc() != BKException.Code.OK)
- throw BKException.create(counter.getrc());
-
- return counter.getLh();
- }
-
- /**
- * Synchronous, unsafe open ledger call
- *
- * @param lId
- * ledger identifier
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * password
- * @return
- * @throws InterruptedException
- * @throws BKException
- */
-
- public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
- throws BKException, InterruptedException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
-
- /*
- * Calls async open ledger
- */
- asyncOpenLedgerNoRecovery(lId, digestType, passwd, this, counter);
-
- /*
- * Wait
- */
- counter.block(0);
- if (counter.getrc() != BKException.Code.OK)
- throw BKException.create(counter.getrc());
-
- return counter.getLh();
- }
-
- /**
- * Deletes a ledger asynchronously.
- *
- * @param lId
- * ledger Id
- * @param cb
- * deleteCallback implementation
- * @param ctx
- * optional control object
- */
- public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
- new LedgerDeleteOp(this, lId, cb, ctx).initiate();
- }
-
- /**
- * Delete callback implementation for synchronous delete call.
- *
- * @param rc
- * return code
- * @param ctx
- * optional control object
- */
- public void deleteComplete(int rc, Object ctx) {
- SyncCounter counter = (SyncCounter) ctx;
- counter.setrc(rc);
- counter.dec();
- }
-
- /**
- * Synchronous call to delete a ledger. Parameters match those of
- * {@link #asyncDeleteLedger(long, DeleteCallback, Object)}
- *
- * @param lId
- * ledgerId
- * @throws InterruptedException
- * @throws BKException
- */
- public void deleteLedger(long lId) throws InterruptedException, BKException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
- // Call asynchronous version
- asyncDeleteLedger(lId, this, counter);
- // Wait
- counter.block(0);
- if (counter.getrc() != KeeperException.Code.OK.intValue()) {
- LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
- throw BKException.create(Code.ZKException);
- }
- }
-
- /**
- * Shuts down client.
- *
- */
- public void halt() throws InterruptedException {
- bookieClient.close();
- bookieWatcher.halt();
- if (ownChannelFactory) {
- channelFactory.releaseExternalResources();
- }
- if (ownZKHandle) {
- zk.close();
- }
- callbackWorker.shutdown();
- mainWorkerPool.shutdown();
- }
+ public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
+ throws BKException, InterruptedException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ /*
+ * Calls async open ledger
+ */
+ asyncOpenLedgerNoRecovery(lId, digestType, passwd, this, counter);
+
+ /*
+ * Wait
+ */
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK)
+ throw BKException.create(counter.getrc());
+
+ return counter.getLh();
+ }
+
+ /**
+ * Deletes a ledger asynchronously.
+ *
+ * @param lId
+ * ledger Id
+ * @param cb
+ * deleteCallback implementation
+ * @param ctx
+ * optional control object
+ */
+ public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
+ new LedgerDeleteOp(this, lId, cb, ctx).initiate();
+ }
+
+ /**
+ * Delete callback implementation for synchronous delete call.
+ *
+ * @param rc
+ * return code
+ * @param ctx
+ * optional control object
+ */
+ public void deleteComplete(int rc, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+ /**
+ * Synchronous call to delete a ledger. Parameters match those of
+ * {@link #asyncDeleteLedger(long, DeleteCallback, Object)}
+ *
+ * @param lId
+ * ledgerId
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public void deleteLedger(long lId) throws InterruptedException, BKException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+ // Call asynchronous version
+ asyncDeleteLedger(lId, this, counter);
+ // Wait
+ counter.block(0);
+ if (counter.getrc() != KeeperException.Code.OK.intValue()) {
+ LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
+ throw BKException.create(Code.ZKException);
+ }
+ }
+
+ /**
+ * Shuts down client.
+ *
+ */
+ public void halt() throws InterruptedException {
+ bookieClient.close();
+ bookieWatcher.halt();
+ if (ownChannelFactory) {
+ channelFactory.releaseExternalResources();
+ }
+ if (ownZKHandle) {
+ zk.close();
+ }
+ callbackWorker.shutdown();
+ mainWorkerPool.shutdown();
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Mon Sep 5 17:38:57 2011
@@ -44,11 +44,11 @@ import org.apache.zookeeper.KeeperExcept
* are available by reading Zookeeper (and setting watches on the bookie nodes).
* When a bookie fails, the other parts of the code turn to this class to find a
* replacement
- *
+ *
*/
class BookieWatcher implements Watcher, ChildrenCallback {
static final Logger logger = Logger.getLogger(BookieWatcher.class);
-
+
public static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available";
static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
public static int ZK_CONNECT_BACKOFF_SEC = 1;
@@ -69,8 +69,8 @@ class BookieWatcher implements Watcher,
this.bk = bk;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
-
- public void halt(){
+
+ public void halt() {
scheduler.shutdown();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java Mon Sep 5 17:38:57 2011
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
class CRC32DigestManager extends DigestManager {
CRC32 crc = new CRC32();
-
+
public CRC32DigestManager(long ledgerId) {
super(ledgerId);
}
@@ -33,7 +33,7 @@ class CRC32DigestManager extends DigestM
int getMacCodeLength() {
return 8;
}
-
+
@Override
byte[] getValueAndReset() {
byte[] value = new byte[8];
@@ -42,7 +42,7 @@ class CRC32DigestManager extends DigestM
crc.reset();
return value;
}
-
+
@Override
void update(byte[] data, int offset, int length) {
crc.update(data, offset, length);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java Mon Sep 5 17:38:57 2011
@@ -37,29 +37,29 @@ import org.jboss.netty.buffer.ChannelBuf
public abstract class DigestManager {
static final Logger logger = Logger.getLogger(DigestManager.class);
-
+
static final int METADATA_LENGTH = 32;
-
+
long ledgerId;
-
+
abstract int getMacCodeLength();
-
- void update(byte[] data){
+
+ void update(byte[] data) {
update(data, 0, data.length);
}
-
+
abstract void update(byte[] data, int offset, int length);
abstract byte[] getValueAndReset();
-
+
final int macCodeLength;
public DigestManager(long ledgerId) {
this.ledgerId = ledgerId;
macCodeLength = getMacCodeLength();
}
-
- static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException{
- switch(digestType){
+
+ static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException {
+ switch(digestType) {
case MAC:
return new MacDigestManager(ledgerId, passwd);
case CRC32:
@@ -71,14 +71,14 @@ public abstract class DigestManager {
/**
* Computes the digest for an entry and put bytes together for sending.
- *
+ *
* @param entryId
* @param lastAddConfirmed
* @param length
* @param data
* @return
*/
-
+
public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) {
byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength];
@@ -133,21 +133,21 @@ public abstract class DigestManager {
if (actualLedgerId != ledgerId) {
logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
- + actualLedgerId);
+ + actualLedgerId);
throw new BKDigestMatchException();
}
if (!skipEntryIdCheck && actualEntryId != entryId) {
logger.error("Entry-id mismatch in authenticated message, expected: " + entryId + " , actual: "
- + actualEntryId);
+ + actualEntryId);
throw new BKDigestMatchException();
}
}
-
+
/**
* Verify that the digest matches and returns the data in the entry.
- *
+ *
* @param entryId
* @param dataReceived
* @return
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java Mon Sep 5 17:38:57 2011
@@ -20,7 +20,7 @@ package org.apache.bookkeeper.client;
/**
* This interface determins how entries are distributed among bookies.
- *
+ *
* Every entry gets replicated to some number of replicas. The first replica for
* an entry is given a replicaIndex of 0, and so on. To distribute write load,
* not all entries go to all bookies. Given an entry-id and replica index, an
@@ -31,7 +31,7 @@ package org.apache.bookkeeper.client;
public interface DistributionSchedule {
/**
- *
+ *
* @param entryId
* @param replicaIndex
* @return index of bookie that should get this replica
@@ -39,7 +39,7 @@ public interface DistributionSchedule {
public int getBookieIndex(long entryId, int replicaIndex);
/**
- *
+ *
* @param entryId
* @param bookieIndex
* @return -1 if the given bookie index is not a replica for the given
@@ -53,7 +53,7 @@ public interface DistributionSchedule {
* sequence and an implementation of this interface should accumulate
* history about which bookie indexes we have heard from. Once this method
* has returned true, it wont be called again on the same instance
- *
+ *
* @param bookieIndexHeardFrom
* @return true if its ok to proceed with recovery
*/
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Mon Sep 5 17:38:57 2011
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.bookkeeper.client;
@@ -39,7 +39,7 @@ import org.apache.zookeeper.data.Stat;
/**
* Encapsulates asynchronous ledger create operation
- *
+ *
*/
class LedgerCreateOp implements StringCallback, StatCallback {
@@ -53,24 +53,24 @@ class LedgerCreateOp implements StringCa
BookKeeper bk;
DigestType digestType;
- /**
- * Constructor
- *
- * @param bk
- * BookKeeper object
- * @param ensembleSize
- * ensemble size
- * @param quorumSize
- * quorum size
- * @param digestType
- * digest type, either MAC or CRC32
- * @param passwd
- * passowrd
- * @param cb
- * callback implementation
- * @param ctx
- * optional control object
- */
+ /**
+ * Constructor
+ *
+ * @param bk
+ * BookKeeper object
+ * @param ensembleSize
+ * ensemble size
+ * @param quorumSize
+ * quorum size
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * passowrd
+ * @param cb
+ * callback implementation
+ * @param ctx
+ * optional control object
+ */
LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
this.bk = bk;
@@ -91,7 +91,7 @@ class LedgerCreateOp implements StringCa
*/
bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT_SEQUENTIAL, this, null);
+ CreateMode.PERSISTENT_SEQUENTIAL, this, null);
// calls the children callback method below
}
@@ -99,7 +99,7 @@ class LedgerCreateOp implements StringCa
/**
* Implements ZooKeeper string callback.
- *
+ *
* @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
*/
public void processResult(int rc, String path, Object ctx, String name) {
@@ -157,7 +157,7 @@ class LedgerCreateOp implements StringCa
/**
* Implements ZooKeeper stat callback.
- *
+ *
* @see org.apache.zookeeper.AsyncCallback.StatCallback#processResult(int, String, Object, Stat)
*/
public void processResult(int rc, String path, Object ctx, Stat stat) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Sep 5 17:38:57 2011
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.bookkeeper.client;
@@ -28,7 +28,7 @@ import org.apache.zookeeper.AsyncCallbac
/**
* Encapsulates asynchronous ledger delete operation
- *
+ *
*/
class LedgerDeleteOp implements VoidCallback {
@@ -41,7 +41,7 @@ class LedgerDeleteOp implements VoidCall
/**
* Constructor
- *
+ *
* @param bk
* BookKeeper object
* @param ledgerId
@@ -69,7 +69,7 @@ class LedgerDeleteOp implements VoidCall
/**
* Implements ZooKeeper Void Callback.
- *
+ *
* @see org.apache.zookeeper.AsyncCallback.VoidCallback#processResult(int,
* java.lang.String, java.lang.Object)
*/
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.client;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.IOException;
@@ -30,54 +30,54 @@ import org.jboss.netty.buffer.ChannelBuf
/**
* Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
* the entry content.
- *
+ *
*/
public class LedgerEntry {
- Logger LOG = Logger.getLogger(LedgerEntry.class);
+ Logger LOG = Logger.getLogger(LedgerEntry.class);
- long ledgerId;
- long entryId;
- long length;
- ChannelBufferInputStream entryDataStream;
-
- int nextReplicaIndexToReadFrom = 0;
-
- LedgerEntry(long lId, long eId) {
- this.ledgerId = lId;
- this.entryId = eId;
- }
-
- public long getLedgerId() {
- return ledgerId;
- }
-
- public long getEntryId() {
- return entryId;
- }
-
- public long getLength() {
- return length;
- }
-
- public byte[] getEntry() {
- try {
- // In general, you can't rely on the available() method of an input
- // stream, but ChannelBufferInputStream is backed by a byte[] so it
- // accurately knows the # bytes available
- byte[] ret = new byte[entryDataStream.available()];
- entryDataStream.readFully(ret);
- return ret;
- } catch (IOException e) {
- // The channelbufferinput stream doesnt really throw the
- // ioexceptions, it just has to be in the signature because
- // InputStream says so. Hence this code, should never be reached.
- LOG.fatal("Unexpected IOException while reading from channel buffer", e);
- return new byte[0];
- }
- }
-
- public InputStream getEntryInputStream() {
- return entryDataStream;
- }
+ long ledgerId;
+ long entryId;
+ long length;
+ ChannelBufferInputStream entryDataStream;
+
+ int nextReplicaIndexToReadFrom = 0;
+
+ LedgerEntry(long lId, long eId) {
+ this.ledgerId = lId;
+ this.entryId = eId;
+ }
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ public long getEntryId() {
+ return entryId;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public byte[] getEntry() {
+ try {
+ // In general, you can't rely on the available() method of an input
+ // stream, but ChannelBufferInputStream is backed by a byte[] so it
+ // accurately knows the # bytes available
+ byte[] ret = new byte[entryDataStream.available()];
+ entryDataStream.readFully(ret);
+ return ret;
+ } catch (IOException e) {
+ // The channelbufferinput stream doesnt really throw the
+ // ioexceptions, it just has to be in the signature because
+ // InputStream says so. Hence this code, should never be reached.
+ LOG.fatal("Unexpected IOException while reading from channel buffer", e);
+ return new byte[0];
+ }
+ }
+
+ public InputStream getEntryInputStream() {
+ return entryDataStream;
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.client;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.net.InetSocketAddress;
@@ -53,275 +53,275 @@ import org.jboss.netty.buffer.ChannelBuf
* write operations to a ledger.
*/
public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback, ReadLastConfirmedCallback {
- final static Logger LOG = Logger.getLogger(LedgerHandle.class);
- final static long LAST_ADD_CONFIRMED = -1;
-
- final byte[] ledgerKey;
- final LedgerMetadata metadata;
- final BookKeeper bk;
- final long ledgerId;
- long lastAddPushed;
- long lastAddConfirmed;
- long length;
- final DigestManager macManager;
- final DistributionSchedule distributionSchedule;
-
- final Semaphore opCounterSem;
- private Integer throttling = 5000;
-
- final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
-
- LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
- DigestType digestType, byte[] password)
- throws GeneralSecurityException, NumberFormatException {
- this.bk = bk;
- this.metadata = metadata;
- if (metadata.isClosed()) {
- lastAddConfirmed = lastAddPushed = metadata.close;
- length = metadata.length;
- } else {
- lastAddConfirmed = lastAddPushed = -1;
- length = 0;
- }
-
- this.ledgerId = ledgerId;
-
- String throttleValue = System.getProperty("throttle");
- if(throttleValue != null){
- this.throttling = new Integer(throttleValue);
- }
- this.opCounterSem = new Semaphore(throttling);
-
- macManager = DigestManager.instantiate(ledgerId, password, digestType);
- this.ledgerKey = MacDigestManager.genDigest("ledger", password);
- distributionSchedule = new RoundRobinDistributionSchedule(
- metadata.quorumSize, metadata.ensembleSize);
- }
-
- /**
- * Get the id of the current ledger
- *
- * @return
- */
- public long getId() {
- return ledgerId;
- }
-
- /**
- * Get the last confirmed entry id on this ledger
- *
- * @return
- */
- public long getLastAddConfirmed() {
- return lastAddConfirmed;
- }
-
- /**
- * Get the entry id of the last entry that has been enqueued for addition (but
- * may not have possibly been persited to the ledger)
- *
- * @return
- */
- public long getLastAddPushed() {
- return lastAddPushed;
- }
-
- /**
- * Get the Ledger's key/password.
- *
- * @return byte array for the ledger's key/password.
- */
- public byte[] getLedgerKey() {
- return ledgerKey;
- }
-
- /**
- * Get the LedgerMetadata
- *
- * @return LedgerMetadata for the LedgerHandle
- */
- public LedgerMetadata getLedgerMetadata() {
- return metadata;
- }
-
- /**
- * Get the DigestManager
- *
- * @return DigestManager for the LedgerHandle
- */
- public DigestManager getDigestManager() {
- return macManager;
- }
-
- /**
- * Return total number of available slots.
- *
- * @return int available slots
- */
- Semaphore getAvailablePermits(){
- return this.opCounterSem;
- }
-
- /**
- * Add to the length of the ledger in bytes.
- *
- * @param delta
- * @return
- */
- long addToLength(long delta){
- this.length += delta;
- return this.length;
- }
-
- /**
- * Returns the length of the ledger in bytes.
- *
- * @return
- */
- public long getLength(){
- return this.length;
- }
-
- /**
- * Get the Distribution Schedule
- *
- * @return DistributionSchedule for the LedgerHandle
- */
- public DistributionSchedule getDistributionSchedule() {
- return distributionSchedule;
- }
-
- public void writeLedgerConfig(StatCallback callback, Object ctx) {
- bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
- metadata.serialize(), -1, callback, ctx);
- }
-
- /**
- * Close this ledger synchronously.
- *
- */
- public void close() throws InterruptedException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
-
- asyncClose(this, counter);
-
- counter.block(0);
- }
-
- /**
- * Asynchronous close, any adds in flight will return errors
- *
- * @param cb
- * callback implementation
- * @param ctx
- * control object
- * @throws InterruptedException
- */
- public void asyncClose(CloseCallback cb, Object ctx) {
- asyncClose(cb, ctx, BKException.Code.LedgerClosedException);
- }
-
- /**
- * Same as public version of asynClose except that this one takes an
- * additional parameter which is the return code to hand to all the pending
- * add ops
- *
- * @param cb
- * @param ctx
- * @param rc
- */
- private void asyncClose(final CloseCallback cb, final Object ctx, final int rc) {
-
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-
- @Override
- public void safeRun() {
- metadata.length = length;
- // Close operation is idempotent, so no need to check if we are
- // already closed
- metadata.close(lastAddConfirmed);
- errorOutPendingAdds(rc);
- lastAddPushed = lastAddConfirmed;
+ final static Logger LOG = Logger.getLogger(LedgerHandle.class);
+ final static long LAST_ADD_CONFIRMED = -1;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
- + metadata.close + " with this many bytes: " + metadata.length);
+ final byte[] ledgerKey;
+ final LedgerMetadata metadata;
+ final BookKeeper bk;
+ final long ledgerId;
+ long lastAddPushed;
+ long lastAddConfirmed;
+ long length;
+ final DigestManager macManager;
+ final DistributionSchedule distributionSchedule;
+
+ final Semaphore opCounterSem;
+ private Integer throttling = 5000;
+
+ final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
+
+ LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+ DigestType digestType, byte[] password)
+ throws GeneralSecurityException, NumberFormatException {
+ this.bk = bk;
+ this.metadata = metadata;
+ if (metadata.isClosed()) {
+ lastAddConfirmed = lastAddPushed = metadata.close;
+ length = metadata.length;
+ } else {
+ lastAddConfirmed = lastAddPushed = -1;
+ length = 0;
}
- writeLedgerConfig(new StatCallback() {
- @Override
- public void processResult(int rc, String path, Object subctx,
- Stat stat) {
- if (rc != KeeperException.Code.OK.intValue()) {
- cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
- ctx);
- } else {
- cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+ this.ledgerId = ledgerId;
+
+ String throttleValue = System.getProperty("throttle");
+ if(throttleValue != null) {
+ this.throttling = new Integer(throttleValue);
+ }
+ this.opCounterSem = new Semaphore(throttling);
+
+ macManager = DigestManager.instantiate(ledgerId, password, digestType);
+ this.ledgerKey = MacDigestManager.genDigest("ledger", password);
+ distributionSchedule = new RoundRobinDistributionSchedule(
+ metadata.quorumSize, metadata.ensembleSize);
+ }
+
+ /**
+ * Get the id of the current ledger
+ *
+ * @return
+ */
+ public long getId() {
+ return ledgerId;
+ }
+
+ /**
+ * Get the last confirmed entry id on this ledger
+ *
+ * @return
+ */
+ public long getLastAddConfirmed() {
+ return lastAddConfirmed;
+ }
+
+ /**
+ * Get the entry id of the last entry that has been enqueued for addition (but
+ * may not have possibly been persited to the ledger)
+ *
+ * @return
+ */
+ public long getLastAddPushed() {
+ return lastAddPushed;
+ }
+
+ /**
+ * Get the Ledger's key/password.
+ *
+ * @return byte array for the ledger's key/password.
+ */
+ public byte[] getLedgerKey() {
+ return ledgerKey;
+ }
+
+ /**
+ * Get the LedgerMetadata
+ *
+ * @return LedgerMetadata for the LedgerHandle
+ */
+ public LedgerMetadata getLedgerMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Get the DigestManager
+ *
+ * @return DigestManager for the LedgerHandle
+ */
+ public DigestManager getDigestManager() {
+ return macManager;
+ }
+
+ /**
+ * Return total number of available slots.
+ *
+ * @return int available slots
+ */
+ Semaphore getAvailablePermits() {
+ return this.opCounterSem;
+ }
+
+ /**
+ * Add to the length of the ledger in bytes.
+ *
+ * @param delta
+ * @return
+ */
+ long addToLength(long delta) {
+ this.length += delta;
+ return this.length;
+ }
+
+ /**
+ * Returns the length of the ledger in bytes.
+ *
+ * @return
+ */
+ public long getLength() {
+ return this.length;
+ }
+
+ /**
+ * Get the Distribution Schedule
+ *
+ * @return DistributionSchedule for the LedgerHandle
+ */
+ public DistributionSchedule getDistributionSchedule() {
+ return distributionSchedule;
+ }
+
+ public void writeLedgerConfig(StatCallback callback, Object ctx) {
+ bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
+ metadata.serialize(), -1, callback, ctx);
+ }
+
+ /**
+ * Close this ledger synchronously.
+ *
+ */
+ public void close() throws InterruptedException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ asyncClose(this, counter);
+
+ counter.block(0);
+ }
+
+ /**
+ * Asynchronous close, any adds in flight will return errors
+ *
+ * @param cb
+ * callback implementation
+ * @param ctx
+ * control object
+ * @throws InterruptedException
+ */
+ public void asyncClose(CloseCallback cb, Object ctx) {
+ asyncClose(cb, ctx, BKException.Code.LedgerClosedException);
+ }
+
+ /**
+ * Same as public version of asynClose except that this one takes an
+ * additional parameter which is the return code to hand to all the pending
+ * add ops
+ *
+ * @param cb
+ * @param ctx
+ * @param rc
+ */
+ private void asyncClose(final CloseCallback cb, final Object ctx, final int rc) {
+
+ bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+
+ @Override
+ public void safeRun() {
+ metadata.length = length;
+ // Close operation is idempotent, so no need to check if we are
+ // already closed
+ metadata.close(lastAddConfirmed);
+ errorOutPendingAdds(rc);
+ lastAddPushed = lastAddConfirmed;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
+ + metadata.close + " with this many bytes: " + metadata.length);
+ }
+
+ writeLedgerConfig(new StatCallback() {
+ @Override
+ public void processResult(int rc, String path, Object subctx,
+ Stat stat) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
+ ctx);
+ } else {
+ cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+ }
+ }
+ }, null);
+
}
- }
- }, null);
+ });
+ }
+
+ /**
+ * Read a sequence of entries synchronously.
+ *
+ * @param firstEntry
+ * id of first entry of sequence (included)
+ * @param lastEntry
+ * id of last entry of sequence (included)
+ *
+ */
+ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
+ throws InterruptedException, BKException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ asyncReadEntries(firstEntry, lastEntry, this, counter);
+
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK) {
+ throw BKException.create(counter.getrc());
+ }
+
+ return counter.getSequence();
+ }
- }
- });
- }
-
- /**
- * Read a sequence of entries synchronously.
- *
- * @param firstEntry
- * id of first entry of sequence (included)
- * @param lastEntry
- * id of last entry of sequence (included)
- *
- */
- public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
- throws InterruptedException, BKException {
- SyncCounter counter = new SyncCounter();
- counter.inc();
-
- asyncReadEntries(firstEntry, lastEntry, this, counter);
-
- counter.block(0);
- if (counter.getrc() != BKException.Code.OK) {
- throw BKException.create(counter.getrc());
- }
-
- return counter.getSequence();
- }
-
- /**
- * Read a sequence of entries asynchronously.
- *
- * @param firstEntry
- * id of first entry of sequence
- * @param lastEntry
- * id of last entry of sequence
- * @param cb
- * object implementing read callback interface
- * @param ctx
- * control object
- */
- public void asyncReadEntries(long firstEntry, long lastEntry,
- ReadCallback cb, Object ctx) {
- // Little sanity check
- if (firstEntry < 0 || lastEntry > lastAddConfirmed
- || firstEntry > lastEntry) {
- cb.readComplete(BKException.Code.ReadException, this, null, ctx);
- return;
- }
-
- try{
- new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-
- } catch (InterruptedException e) {
- cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
+ /**
+ * Read a sequence of entries asynchronously.
+ *
+ * @param firstEntry
+ * id of first entry of sequence
+ * @param lastEntry
+ * id of last entry of sequence
+ * @param cb
+ * object implementing read callback interface
+ * @param ctx
+ * control object
+ */
+ public void asyncReadEntries(long firstEntry, long lastEntry,
+ ReadCallback cb, Object ctx) {
+ // Little sanity check
+ if (firstEntry < 0 || lastEntry > lastAddConfirmed
+ || firstEntry > lastEntry) {
+ cb.readComplete(BKException.Code.ReadException, this, null, ctx);
+ return;
+ }
+
+ try {
+ new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
+
+ } catch (InterruptedException e) {
+ cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
+ }
}
- }
/**
* Add entry synchronously to an open ledger.
- *
+ *
* @param data
* array of bytes to be written to the ledger
*/
@@ -331,7 +331,7 @@ public class LedgerHandle implements Rea
/**
* Add entry synchronously to an open ledger.
- *
+ *
* @param data
* array of bytes to be written to the ledger
* @param offset
@@ -339,36 +339,36 @@ public class LedgerHandle implements Rea
* @param length
* number of bytes to take from data
*/
- public long addEntry(byte[] data, int offset, int length)
+ public long addEntry(byte[] data, int offset, int length)
throws InterruptedException, BKException {
LOG.debug("Adding entry " + data);
SyncCounter counter = new SyncCounter();
counter.inc();
-
+
asyncAddEntry(data, offset, length, this, counter);
counter.block(0);
return counter.getrc();
}
- /**
- * Add entry asynchronously to an open ledger.
- *
- * @param data
- * array of bytes to be written
- * @param cb
- * object implementing callbackinterface
- * @param ctx
- * some control object
- */
- public void asyncAddEntry(final byte[] data, final AddCallback cb,
+ /**
+ * Add entry asynchronously to an open ledger.
+ *
+ * @param data
+ * array of bytes to be written
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ */
+ public void asyncAddEntry(final byte[] data, final AddCallback cb,
final Object ctx) {
asyncAddEntry(data, 0, data.length, cb, ctx);
}
/**
* Add entry asynchronously to an open ledger, using an offset and range.
- *
+ *
* @param data
* array of bytes to be written
* @param offset
@@ -379,285 +379,285 @@ public class LedgerHandle implements Rea
* object implementing callbackinterface
* @param ctx
* some control object
- * @throws ArrayIndexOutOfBoundsException if offset or length is negative or
+ * @throws ArrayIndexOutOfBoundsException if offset or length is negative or
* offset and length sum to a value higher than the length of data.
*/
- public void asyncAddEntry(final byte[] data, final int offset, final int length,
+ public void asyncAddEntry(final byte[] data, final int offset, final int length,
final AddCallback cb, final Object ctx) {
if (offset < 0 || length < 0
- || (offset + length) > data.length) {
+ || (offset + length) > data.length) {
throw new ArrayIndexOutOfBoundsException(
- "Invalid values for offset("+offset
- +") or length("+length+")");
+ "Invalid values for offset("+offset
+ +") or length("+length+")");
}
- try{
+ try {
opCounterSem.acquire();
} catch (InterruptedException e) {
cb.addComplete(BKException.Code.InterruptedException,
- LedgerHandle.this, -1, ctx);
+ LedgerHandle.this, -1, ctx);
}
-
- try{
+
+ try {
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- if (metadata.isClosed()) {
- LOG.warn("Attempt to add to closed ledger: " + ledgerId);
- LedgerHandle.this.opCounterSem.release();
- cb.addComplete(BKException.Code.LedgerClosedException,
- LedgerHandle.this, -1, ctx);
- return;
- }
-
- long entryId = ++lastAddPushed;
- long currentLength = addToLength(length);
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
- pendingAddOps.add(op);
- ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
- entryId, lastAddConfirmed, currentLength, data, offset, length);
- op.initiate(toSend);
+ @Override
+ public void safeRun() {
+ if (metadata.isClosed()) {
+ LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+ LedgerHandle.this.opCounterSem.release();
+ cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandle.this, -1, ctx);
+ return;
}
- });
+
+ long entryId = ++lastAddPushed;
+ long currentLength = addToLength(length);
+ PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+ pendingAddOps.add(op);
+ ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+ entryId, lastAddConfirmed, currentLength, data, offset, length);
+ op.initiate(toSend);
+ }
+ });
} catch (RuntimeException e) {
opCounterSem.release();
throw e;
}
}
- /**
- * Obtains last confirmed write from a quorum of bookies.
- *
- * @param cb
- * @param ctx
- */
-
- public void asyncReadLastConfirmed(ReadLastConfirmedCallback cb, Object ctx){
- new ReadLastConfirmedOp(this, cb, ctx).initiate();
- }
-
-
- /**
- * Context objects for synchronous call to read last confirmed.
- */
- class LastConfirmedCtx {
- long response;
- int rc;
-
- LastConfirmedCtx(){
- this.response = -1;
- }
-
- void setLastConfirmed(long lastConfirmed){
- this.response = lastConfirmed;
- }
-
- long getlastConfirmed(){
- return this.response;
- }
-
- void setRC(int rc){
- this.rc = rc;
- }
-
- int getRC(){
- return this.rc;
- }
-
- boolean ready(){
- return (this.response != -1);
- }
- }
-
- public long readLastConfirmed()
- throws InterruptedException, BKException {
- LastConfirmedCtx ctx = new LastConfirmedCtx();
- asyncReadLastConfirmed(this, ctx);
- synchronized(ctx){
- while(!ctx.ready()){
- ctx.wait();
- }
- }
-
- if(ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC());
- return ctx.getlastConfirmed();
- }
-
- // close the ledger and send fails to all the adds in the pipeline
- void handleUnrecoverableErrorDuringAdd(int rc) {
- asyncClose(NoopCloseCallback.instance, null, rc);
- }
-
- void errorOutPendingAdds(int rc) {
- PendingAddOp pendingAddOp;
- while ((pendingAddOp = pendingAddOps.poll()) != null) {
- pendingAddOp.submitCallback(rc);
- }
- }
-
- void sendAddSuccessCallbacks() {
- // Start from the head of the queue and proceed while there are
- // entries that have had all their responses come back
- PendingAddOp pendingAddOp;
- while ((pendingAddOp = pendingAddOps.peek()) != null) {
- if (pendingAddOp.numResponsesPending != 0) {
- return;
- }
- pendingAddOps.remove();
- lastAddConfirmed = pendingAddOp.entryId;
- pendingAddOp.submitCallback(BKException.Code.OK);
- }
-
- }
-
- void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
- InetSocketAddress newBookie;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Handling failure of bookie: " + addr + " index: "
- + bookieIndex);
- }
-
- try {
- newBookie = bk.bookieWatcher
- .getAdditionalBookie(metadata.currentEnsemble);
- } catch (BKNotEnoughBookiesException e) {
- LOG
- .error("Could not get additional bookie to remake ensemble, closing ledger: "
- + ledgerId);
- handleUnrecoverableErrorDuringAdd(e.getCode());
- return;
- }
-
- final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(
- metadata.currentEnsemble);
- newEnsemble.set(bookieIndex, newBookie);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Changing ensemble from: " + metadata.currentEnsemble + " to: "
- + newEnsemble + " for ledger: " + ledgerId + " starting at entry: "
- + (lastAddConfirmed + 1));
- }
-
- metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
-
- writeLedgerConfig(new StatCallback() {
- @Override
- public void processResult(final int rc, String path, Object ctx, Stat stat) {
+ /**
+ * Obtains last confirmed write from a quorum of bookies.
+ *
+ * @param cb
+ * @param ctx
+ */
+
+ public void asyncReadLastConfirmed(ReadLastConfirmedCallback cb, Object ctx) {
+ new ReadLastConfirmedOp(this, cb, ctx).initiate();
+ }
+
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG
- .error("Could not persist ledger metadata while changing ensemble to: "
- + newEnsemble + " , closing ledger");
- handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
- return;
+ /**
+ * Context objects for synchronous call to read last confirmed.
+ */
+ class LastConfirmedCtx {
+ long response;
+ int rc;
+
+ LastConfirmedCtx() {
+ this.response = -1;
+ }
+
+ void setLastConfirmed(long lastConfirmed) {
+ this.response = lastConfirmed;
+ }
+
+ long getlastConfirmed() {
+ return this.response;
+ }
+
+ void setRC(int rc) {
+ this.rc = rc;
+ }
+
+ int getRC() {
+ return this.rc;
+ }
+
+ boolean ready() {
+ return (this.response != -1);
+ }
+ }
+
+ public long readLastConfirmed()
+ throws InterruptedException, BKException {
+ LastConfirmedCtx ctx = new LastConfirmedCtx();
+ asyncReadLastConfirmed(this, ctx);
+ synchronized(ctx) {
+ while(!ctx.ready()) {
+ ctx.wait();
}
+ }
+
+ if(ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC());
+ return ctx.getlastConfirmed();
+ }
+
+ // close the ledger and send fails to all the adds in the pipeline
+ void handleUnrecoverableErrorDuringAdd(int rc) {
+ asyncClose(NoopCloseCallback.instance, null, rc);
+ }
- for (PendingAddOp pendingAddOp : pendingAddOps) {
- pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+ void errorOutPendingAdds(int rc) {
+ PendingAddOp pendingAddOp;
+ while ((pendingAddOp = pendingAddOps.poll()) != null) {
+ pendingAddOp.submitCallback(rc);
+ }
+ }
+
+ void sendAddSuccessCallbacks() {
+ // Start from the head of the queue and proceed while there are
+ // entries that have had all their responses come back
+ PendingAddOp pendingAddOp;
+ while ((pendingAddOp = pendingAddOps.peek()) != null) {
+ if (pendingAddOp.numResponsesPending != 0) {
+ return;
}
- }
- });
+ pendingAddOps.remove();
+ lastAddConfirmed = pendingAddOp.entryId;
+ pendingAddOp.submitCallback(BKException.Code.OK);
+ }
+
+ }
+
+ void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
+ InetSocketAddress newBookie;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handling failure of bookie: " + addr + " index: "
+ + bookieIndex);
+ }
+
+ try {
+ newBookie = bk.bookieWatcher
+ .getAdditionalBookie(metadata.currentEnsemble);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG
+ .error("Could not get additional bookie to remake ensemble, closing ledger: "
+ + ledgerId);
+ handleUnrecoverableErrorDuringAdd(e.getCode());
+ return;
+ }
+
+ final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(
+ metadata.currentEnsemble);
+ newEnsemble.set(bookieIndex, newBookie);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Changing ensemble from: " + metadata.currentEnsemble + " to: "
+ + newEnsemble + " for ledger: " + ledgerId + " starting at entry: "
+ + (lastAddConfirmed + 1));
+ }
+
+ metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
+
+ writeLedgerConfig(new StatCallback() {
+ @Override
+ public void processResult(final int rc, String path, Object ctx, Stat stat) {
+
+ bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG
+ .error("Could not persist ledger metadata while changing ensemble to: "
+ + newEnsemble + " , closing ledger");
+ handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ return;
+ }
- }
- }, null);
+ for (PendingAddOp pendingAddOp : pendingAddOps) {
+ pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+ }
+ }
+ });
- }
+ }
+ }, null);
- void recover(GenericCallback<Void> cb) {
- if (metadata.isClosed()) {
- // We are already closed, nothing to do
- cb.operationComplete(BKException.Code.OK, null);
- return;
}
- new LedgerRecoveryOp(this, cb).initiate();
- }
+ void recover(GenericCallback<Void> cb) {
+ if (metadata.isClosed()) {
+ // We are already closed, nothing to do
+ cb.operationComplete(BKException.Code.OK, null);
+ return;
+ }
- static class NoopCloseCallback implements CloseCallback {
- static NoopCloseCallback instance = new NoopCloseCallback();
+ new LedgerRecoveryOp(this, cb).initiate();
+ }
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- // noop
+ static class NoopCloseCallback implements CloseCallback {
+ static NoopCloseCallback instance = new NoopCloseCallback();
+
+ @Override
+ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+ // noop
+ }
+ }
+
+ /**
+ * Implementation of callback interface for synchronous read method.
+ *
+ * @param rc
+ * return code
+ * @param leder
+ * ledger identifier
+ * @param seq
+ * sequence of entries
+ * @param ctx
+ * control object
+ */
+ public void readComplete(int rc, LedgerHandle lh,
+ Enumeration<LedgerEntry> seq, Object ctx) {
+
+ SyncCounter counter = (SyncCounter) ctx;
+ synchronized (counter) {
+ counter.setSequence(seq);
+ counter.setrc(rc);
+ counter.dec();
+ counter.notify();
+ }
}
- }
- /**
- * Implementation of callback interface for synchronous read method.
- *
- * @param rc
- * return code
- * @param leder
- * ledger identifier
- * @param seq
- * sequence of entries
- * @param ctx
- * control object
- */
- public void readComplete(int rc, LedgerHandle lh,
- Enumeration<LedgerEntry> seq, Object ctx) {
-
- SyncCounter counter = (SyncCounter) ctx;
- synchronized (counter) {
- counter.setSequence(seq);
- counter.setrc(rc);
- counter.dec();
- counter.notify();
- }
- }
-
- /**
- * Implementation of callback interface for synchronous read method.
- *
- * @param rc
- * return code
- * @param leder
- * ledger identifier
- * @param entry
- * entry identifier
- * @param ctx
- * control object
- */
- public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
- SyncCounter counter = (SyncCounter) ctx;
-
- counter.setrc(rc);
- counter.dec();
- }
-
-
-
- /**
- * Implementation of callback interface for synchronous read last confirmed method.
- */
- public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
- LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
-
- synchronized(lcCtx){
- lcCtx.setRC(rc);
- lcCtx.setLastConfirmed(lastConfirmed);
- lcCtx.notify();
- }
- }
-
- /**
- * Close callback method
- *
- * @param rc
- * @param lh
- * @param ctx
- */
- public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-
- SyncCounter counter = (SyncCounter) ctx;
- counter.setrc(rc);
- synchronized (counter) {
- counter.dec();
- counter.notify();
+ /**
+ * Implementation of callback interface for synchronous read method.
+ *
+ * @param rc
+ * return code
+ * @param leder
+ * ledger identifier
+ * @param entry
+ * entry identifier
+ * @param ctx
+ * control object
+ */
+ public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+
+
+ /**
+ * Implementation of callback interface for synchronous read last confirmed method.
+ */
+ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+ LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
+
+ synchronized(lcCtx) {
+ lcCtx.setRC(rc);
+ lcCtx.setLastConfirmed(lastConfirmed);
+ lcCtx.notify();
+ }
}
- }
+ /**
+ * Close callback method
+ *
+ * @param rc
+ * @param lh
+ * @param ctx
+ */
+ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setrc(rc);
+ synchronized (counter) {
+ counter.dec();
+ counter.notify();
+ }
+
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Mon Sep 5 17:38:57 2011
@@ -31,7 +31,7 @@ import org.apache.log4j.Logger;
/**
* This class encapsulates all the ledger metadata that is persistently stored
* in zookeeper. It provides parsing and serialization methods of such metadata.
- *
+ *
*/
public class LedgerMetadata {
static final Logger LOG = Logger.getLogger(LedgerMetadata.class);
@@ -53,9 +53,9 @@ public class LedgerMetadata {
public LedgerMetadata(int ensembleSize, int quorumSize) {
this.ensembleSize = ensembleSize;
this.quorumSize = quorumSize;
-
+
/*
- * It is set in PendingReadOp.readEntryComplete, and
+ * It is set in PendingReadOp.readEntryComplete, and
* we read it in LedgerRecoveryOp.readComplete.
*/
this.length = 0;
@@ -67,16 +67,16 @@ public class LedgerMetadata {
}
/**
- * Get the Map of bookie ensembles for the various ledger fragments
+ * Get the Map of bookie ensembles for the various ledger fragments
* that make up the ledger.
- *
- * @return SortedMap of Ledger Fragments and the corresponding
+ *
+ * @return SortedMap of Ledger Fragments and the corresponding
* bookie ensembles that store the entries.
*/
public SortedMap<Long, ArrayList<InetSocketAddress>> getEnsembles() {
return ensembles;
}
-
+
boolean isClosed() {
return close != NOTCLOSED;
}
@@ -84,7 +84,7 @@ public class LedgerMetadata {
void close(long entryId) {
close = entryId;
}
-
+
void addEnsemble(long startEntryId, ArrayList<InetSocketAddress> ensemble) {
assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
@@ -101,7 +101,7 @@ public class LedgerMetadata {
/**
* the entry id > the given entry-id at which the next ensemble change takes
* place ( -1 if no further ensemble changes)
- *
+ *
* @param entryId
* @return
*/
@@ -117,7 +117,7 @@ public class LedgerMetadata {
/**
* Generates a byte array based on a LedgerConfig object received.
- *
+ *
* @param config
* LedgerConfig object
* @return byte[]
@@ -133,7 +133,7 @@ public class LedgerMetadata {
StringUtils.addrToString(s, addr);
}
}
-
+
if (close != NOTCLOSED) {
s.append(lSplitter).append(close).append(tSplitter).append(closed);
}
@@ -147,7 +147,7 @@ public class LedgerMetadata {
/**
* Parses a given byte array and transforms into a LedgerConfig object
- *
+ *
* @param array
* byte array to parse
* @return LedgerConfig
@@ -173,8 +173,8 @@ public class LedgerMetadata {
try {
lc.quorumSize = new Integer(lines[0]);
lc.ensembleSize = new Integer(lines[1]);
- lc.length = new Long(lines[2]);
-
+ lc.length = new Long(lines[2]);
+
for (int i = 3; i < lines.length; i++) {
String parts[] = lines[i].split(tSplitter);