You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/03/01 15:58:08 UTC
svn commit: r1295604 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
hedwig-client/src/main/cpp/scripts/
Author: fpj
Date: Thu Mar 1 14:58:08 2012
New Revision: 1295604
URL: http://svn.apache.org/viewvc?rev=1295604&view=rev
Log:
BOOKKEEPER-176: HierarchicalBookieFailureTest Hung (ivank via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar 1 14:58:08 2012
@@ -48,6 +48,8 @@ Trunk (unreleased changes)
BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)
+ BOOKKEEPER-176: HierarchicalBookieFailureTest Hung (ivank via fpj)
+
hedwig-server/
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Mar 1 14:58:08 2012
@@ -24,6 +24,9 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -41,6 +44,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.bookkeeper.util.SafeRunnable;
/**
* BookKeeper client. We assume there is one single writer to a ledger at any
@@ -59,28 +63,35 @@ public class BookKeeper {
static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
- ZooKeeper zk = null;
- // whether the zk handle is one we created, or is owned by whoever
- // instantiated us
- boolean ownZKHandle = false;
+ final ZooKeeper zk;
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ final static int zkConnectTimeoutMs = 5000;
+ final ClientSocketChannelFactory channelFactory;
- ClientSocketChannelFactory channelFactory;
// whether the socket factory is one we created, or is owned by whoever
// instantiated us
boolean ownChannelFactory = false;
+ // whether the zk handle is one we created, or is owned by whoever
+ // instantiated us
+ boolean ownZKHandle = false;
- BookieClient bookieClient;
- BookieWatcher bookieWatcher;
+ final BookieClient bookieClient;
+ final BookieWatcher bookieWatcher;
OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
.getRuntime().availableProcessors());
- OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+ final OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
.getRuntime().availableProcessors());
// Ledger manager responsible for how to store ledger meta data
final LedgerManager ledgerManager;
- ClientConfiguration conf;
+ final ClientConfiguration conf;
+
+ interface ZKConnectCallback {
+ public void connected();
+ public void connectionFailed(int code);
+ }
/**
* Create a bookkeeper client. A zookeeper client and a client socket factory
@@ -111,21 +122,30 @@ public class BookKeeper {
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ClientConfiguration conf) throws IOException, InterruptedException,
- KeeperException {
- this(conf, new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // TODO: handle session disconnects and expires
- if (LOG.isDebugEnabled()) {
- LOG.debug("Process: " + event.getType() + " " + event.getPath());
- }
- }
- }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
+ public BookKeeper(ClientConfiguration conf)
+ throws IOException, InterruptedException, KeeperException {
+ this.conf = conf;
+ this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+ connectLatch.countDown();
+ }
+ // TODO: handle session disconnects and expires
+ LOG.debug("Process: {} {}", event.getType(), event.getPath());
+ }
+ });
+ this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ bookieWatcher = new BookieWatcher(this);
+ bookieWatcher.readBookiesBlocking();
+ bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
+ // initialize ledger meta manager
+ ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
- ownZKHandle = true;
ownChannelFactory = true;
+ ownZKHandle = true;
}
/**
@@ -176,10 +196,32 @@ public class BookKeeper {
bookieWatcher = new BookieWatcher(this);
bookieWatcher.readBookiesBlocking();
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- // intialize ledger meta manager
+ // initialize ledger meta manager
ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
}
+ void withZKConnected(final ZKConnectCallback cb) {
+ if (ownZKHandle) {
+ mainWorkerPool.submit(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ try {
+ if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) {
+ cb.connectionFailed(BKException.Code.ZKException);
+ } else {
+ cb.connected();
+ }
+ } catch (InterruptedException ie) {
+ // someone trying to kill the process
+ cb.connectionFailed(BKException.Code.InterruptedException);
+ }
+ }
+ });
+ } else {
+ cb.connected();
+ }
+ }
+
LedgerManager getLedgerManager() {
return ledgerManager;
}
@@ -235,12 +277,17 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
- public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
- byte[] passwd, CreateCallback cb, Object ctx) {
-
- new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
- .initiate();
-
+ public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
+ final byte[] passwd, final CreateCallback cb, final Object ctx) {
+ withZKConnected(new ZKConnectCallback() {
+ public void connected() {
+ new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+ .initiate();
+ }
+ public void connectionFailed(int code) {
+ cb.createComplete(code, null, ctx);
+ }
+ });
}
@@ -322,9 +369,16 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
- public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
- OpenCallback cb, Object ctx) {
- new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiate();
+ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
+ final OpenCallback cb, final Object ctx) {
+ withZKConnected(new ZKConnectCallback() {
+ public void connected() {
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
+ }
+ public void connectionFailed(int code) {
+ cb.openComplete(code, null, ctx);
+ }
+ });
}
/**
@@ -354,9 +408,16 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
- public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
- OpenCallback cb, Object ctx) {
- new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
+ final OpenCallback cb, final Object ctx) {
+ withZKConnected(new ZKConnectCallback() {
+ public void connected() {
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+ }
+ public void connectionFailed(int code) {
+ cb.openComplete(code, null, ctx);
+ }
+ });
}
@@ -441,8 +502,15 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
- public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
- new LedgerDeleteOp(this, lId, cb, ctx).initiate();
+ public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
+ withZKConnected(new ZKConnectCallback() {
+ public void connected() {
+ new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
+ }
+ public void connectionFailed(int code) {
+ cb.deleteComplete(code, ctx);
+ }
+ });
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Thu Mar 1 14:58:08 2012
@@ -23,7 +23,7 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
-import java.util.ArrayDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Queue;
@@ -69,7 +69,7 @@ public class LedgerHandle {
final Semaphore opCounterSem;
private final Integer throttling;
- final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
+ final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Thu Mar 1 14:58:08 2012
@@ -168,7 +168,7 @@ abstract class AbstractZkLedgerManager i
});
ctx.wait();
}
- if (Code.OK.intValue() != ctx.rc && null != ctx.ledgers) {
+ if (Code.OK.intValue() != ctx.rc) {
throw new IOException("Error on getting ledgers from node " + nodePath);
}
return ctx.ledgers;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh Thu Mar 1 14:58:08 2012
@@ -26,10 +26,10 @@ setup_delays() {
Darwin|FreeBSD)
sudo ipfw pipe 1 config delay ${1}ms
sudo ipfw add pipe 1 dst-port 4081
- sudo ipfw add pipe 1 dst-port 4081
- sudo ipfw add pipe 1 src-port 4082
+ sudo ipfw add pipe 1 src-port 4081
+ sudo ipfw add pipe 1 dst-port 4082
sudo ipfw add pipe 1 src-port 4082
- sudo ipfw add pipe 1 src-port 4083
+ sudo ipfw add pipe 1 dst-port 4083
sudo ipfw add pipe 1 src-port 4083
;;
Linux)