You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2012/03/01 15:58:08 UTC

svn commit: r1295604 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ hedwig-client/src/main/cpp/scripts/

Author: fpj
Date: Thu Mar  1 14:58:08 2012
New Revision: 1295604

URL: http://svn.apache.org/viewvc?rev=1295604&view=rev
Log:
BOOKKEEPER-176: HierarchicalBookieFailureTest Hung (ivank via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar  1 14:58:08 2012
@@ -48,6 +48,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)
 
+	BOOKKEEPER-176: HierarchicalBookieFailureTest Hung (ivank via fpj)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Mar  1 14:58:08 2012
@@ -24,6 +24,9 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.util.concurrent.Executors;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -41,6 +44,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.bookkeeper.util.SafeRunnable;
 
 /**
  * BookKeeper client. We assume there is one single writer to a ledger at any
@@ -59,28 +63,35 @@ public class BookKeeper {
 
     static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
-    ZooKeeper zk = null;
-    // whether the zk handle is one we created, or is owned by whoever
-    // instantiated us
-    boolean ownZKHandle = false;
+    final ZooKeeper zk;
+    final CountDownLatch connectLatch = new CountDownLatch(1);
+    final static int zkConnectTimeoutMs = 5000;
+    final ClientSocketChannelFactory channelFactory;
 
-    ClientSocketChannelFactory channelFactory;
     // whether the socket factory is one we created, or is owned by whoever
     // instantiated us
     boolean ownChannelFactory = false;
+    // whether the zk handle is one we created, or is owned by whoever
+    // instantiated us
+    boolean ownZKHandle = false;
 
-    BookieClient bookieClient;
-    BookieWatcher bookieWatcher;
+    final BookieClient bookieClient;
+    final BookieWatcher bookieWatcher;
 
     OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
             .getRuntime().availableProcessors());
-    OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+    final OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
             .getRuntime().availableProcessors());
 
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManager ledgerManager;
 
-    ClientConfiguration conf;
+    final ClientConfiguration conf;
+
+    interface ZKConnectCallback {
+        public void connected();
+        public void connectionFailed(int code);
+    }
 
     /**
      * Create a bookkeeper client. A zookeeper client and a client socket factory
@@ -111,21 +122,30 @@ public class BookKeeper {
      * @throws InterruptedException
      * @throws KeeperException
      */
-    public BookKeeper(ClientConfiguration conf) throws IOException, InterruptedException,
-        KeeperException {
-        this(conf, new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                // TODO: handle session disconnects and expires
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Process: " + event.getType() + " " + event.getPath());
-                }
-            }
-        }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                              Executors.newCachedThreadPool()));
+    public BookKeeper(ClientConfiguration conf)
+            throws IOException, InterruptedException, KeeperException {
+        this.conf = conf;
+        this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
+                new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+                            connectLatch.countDown();
+                        }
+                        // TODO: handle session disconnects and expires
+                        LOG.debug("Process: {} {}", event.getType(), event.getPath());
+                    }
+                });
+        this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                                                                Executors.newCachedThreadPool());
+        bookieWatcher = new BookieWatcher(this);
+        bookieWatcher.readBookiesBlocking();
+        bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
+        // initialize ledger meta manager
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
 
-        ownZKHandle = true;
         ownChannelFactory = true;
+        ownZKHandle = true;
      }
 
     /**
@@ -176,10 +196,32 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(this);
         bookieWatcher.readBookiesBlocking();
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        // intialize ledger meta manager
+        // initialize ledger meta manager
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
     }
 
+    void withZKConnected(final ZKConnectCallback cb) {
+        if (ownZKHandle) {
+            mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        try {
+                            if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) {
+                                cb.connectionFailed(BKException.Code.ZKException);
+                            } else {
+                                cb.connected();
+                            }
+                        } catch (InterruptedException ie) {
+                            // someone trying to kill the process
+                            cb.connectionFailed(BKException.Code.InterruptedException);
+                        }
+                    }
+                });
+        } else {
+            cb.connected();
+        }
+    }
+
     LedgerManager getLedgerManager() {
         return ledgerManager;
     }
@@ -235,12 +277,17 @@ public class BookKeeper {
      * @param ctx
      *          optional control object
      */
-    public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
-                                  byte[] passwd, CreateCallback cb, Object ctx) {
-
-        new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
-        .initiate();
-
+    public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
+                                  final byte[] passwd, final CreateCallback cb, final Object ctx) {
+        withZKConnected(new ZKConnectCallback() {
+                public void connected() {
+                    new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+                        .initiate();
+                }
+                public void connectionFailed(int code) {
+                    cb.createComplete(code, null, ctx);
+                }
+            });
     }
 
 
@@ -322,9 +369,16 @@ public class BookKeeper {
      * @param ctx
      *          optional control object
      */
-    public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
-                                OpenCallback cb, Object ctx) {
-        new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiate();
+    public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
+                                final OpenCallback cb, final Object ctx) {
+        withZKConnected(new ZKConnectCallback() {
+                public void connected() {
+                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
+                }
+                public void connectionFailed(int code) {
+                    cb.openComplete(code, null, ctx);
+                }
+            });
     }
 
     /**
@@ -354,9 +408,16 @@ public class BookKeeper {
      * @param ctx
      *          optional control object
      */
-    public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
-                                          OpenCallback cb, Object ctx) {
-        new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+    public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
+                                          final OpenCallback cb, final Object ctx) {
+        withZKConnected(new ZKConnectCallback() {
+                public void connected() {
+                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
+                }
+                public void connectionFailed(int code) {
+                    cb.openComplete(code, null, ctx);
+                }
+            });
     }
 
 
@@ -441,8 +502,15 @@ public class BookKeeper {
      * @param ctx
      *            optional control object
      */
-    public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
-        new LedgerDeleteOp(this, lId, cb, ctx).initiate();
+    public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
+        withZKConnected(new ZKConnectCallback() {
+                public void connected() {
+                    new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
+                }
+                public void connectionFailed(int code) {
+                    cb.deleteComplete(code, ctx);
+                }
+            });
     }
 
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Thu Mar  1 14:58:08 2012
@@ -23,7 +23,7 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
-import java.util.ArrayDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Queue;
@@ -69,7 +69,7 @@ public class LedgerHandle {
     final Semaphore opCounterSem;
     private final Integer throttling;
 
-    final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
+    final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
     LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
                  DigestType digestType, byte[] password)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Thu Mar  1 14:58:08 2012
@@ -168,7 +168,7 @@ abstract class AbstractZkLedgerManager i
             });
             ctx.wait();
         }
-        if (Code.OK.intValue() != ctx.rc && null != ctx.ledgers) {
+        if (Code.OK.intValue() != ctx.rc) {
             throw new IOException("Error on getting ledgers from node " + nodePath);
         }
         return ctx.ledgers;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh?rev=1295604&r1=1295603&r2=1295604&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/network-delays.sh Thu Mar  1 14:58:08 2012
@@ -26,10 +26,10 @@ setup_delays() {
 	Darwin|FreeBSD)
 	    sudo ipfw pipe 1 config delay ${1}ms
 	    sudo ipfw add pipe 1 dst-port 4081
-	    sudo ipfw add pipe 1 dst-port 4081
-	    sudo ipfw add pipe 1 src-port 4082 
+	    sudo ipfw add pipe 1 src-port 4081
+	    sudo ipfw add pipe 1 dst-port 4082 
 	    sudo ipfw add pipe 1 src-port 4082 
-	    sudo ipfw add pipe 1 src-port 4083 
+	    sudo ipfw add pipe 1 dst-port 4083 
 	    sudo ipfw add pipe 1 src-port 4083 
             ;;
 	Linux)