You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/06/05 08:21:07 UTC
svn commit: r1346253 - in /zookeeper/bookkeeper/branches/branch-4.1: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: sijie
Date: Tue Jun 5 06:21:07 2012
New Revision: 1346253
URL: http://svn.apache.org/viewvc?rev=1346253&view=rev
Log:
BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)
Added:
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java (with props)
Modified:
zookeeper/bookkeeper/branches/branch-4.1/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
Modified: zookeeper/bookkeeper/branches/branch-4.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/CHANGES.txt?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/CHANGES.txt Tue Jun 5 06:21:07 2012
@@ -120,6 +120,8 @@ Release 4.1.0 - 2012-05-31
BOOKKEEPER-273: LedgerHandle.deleteLedger() should be idempotent (Matteo Merli via ivank)
+ BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)
+
hedwig-client/
BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)
Modified: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jun 5 06:21:07 2012
@@ -119,27 +119,35 @@ public class BookKeeper {
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ClientConfiguration conf)
+ public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, KeeperException {
this.conf = conf;
+
+ final CountDownLatch zkConnectLatch = new CountDownLatch(1);
this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
new Watcher() {
@Override
public void process(WatchedEvent event) {
- if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
- connectLatch.countDown();
- }
+ // countdown the latch on all events, even if we haven't
+ // successfully connected.
+ zkConnectLatch.countDown();
+
// TODO: handle session disconnects and expires
LOG.debug("Process: {} {}", event.getType(), event.getPath());
}
});
+ if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+ || !zk.getState().isConnected()) {
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
+
this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
- bookieWatcher = new BookieWatcher(conf, this);
- bookieWatcher.readBookiesBlocking();
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- // initialize ledger meta manager
+ bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher.readBookiesBlocking();
+
ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
ownChannelFactory = true;
@@ -176,49 +184,33 @@ public class BookKeeper {
* {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
- * the bookies have registered
+ * the bookies have registered. The ZooKeeper client must be connected
+ * before it is passed to BookKeeper. Otherwise a KeeperException is thrown.
* @param channelFactory
* A factory that will be used to create connections to the bookies
* @throws IOException
* @throws InterruptedException
- * @throws KeeperException
+ * @throws KeeperException if the passed zk handle is not connected
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
throws IOException, InterruptedException, KeeperException {
if (zk == null || channelFactory == null) {
throw new NullPointerException();
}
+ if (!zk.getState().isConnected()) {
+ LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
this.conf = conf;
this.zk = zk;
this.channelFactory = channelFactory;
- bookieWatcher = new BookieWatcher(conf, this);
- bookieWatcher.readBookiesBlocking();
+
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- // initialize ledger meta manager
- ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
- }
+ bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher.readBookiesBlocking();
- void withZKConnected(final ZKConnectCallback cb) {
- if (ownZKHandle) {
- mainWorkerPool.submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- try {
- if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) {
- cb.connectionFailed(BKException.Code.ZKException);
- } else {
- cb.connected();
- }
- } catch (InterruptedException ie) {
- // someone trying to kill the process
- cb.connectionFailed(BKException.Code.InterruptedException);
- }
- }
- });
- } else {
- cb.connected();
- }
+ ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
}
LedgerManager getLedgerManager() {
@@ -278,15 +270,8 @@ public class BookKeeper {
*/
public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
final byte[] passwd, final CreateCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
- .initiate();
- }
- public void connectionFailed(int code) {
- cb.createComplete(code, null, ctx);
- }
- });
+ new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+ .initiate();
}
@@ -370,14 +355,7 @@ public class BookKeeper {
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
final OpenCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
- }
- public void connectionFailed(int code) {
- cb.openComplete(code, null, ctx);
- }
- });
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
}
/**
@@ -409,14 +387,7 @@ public class BookKeeper {
*/
public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
final OpenCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
- }
- public void connectionFailed(int code) {
- cb.openComplete(code, null, ctx);
- }
- });
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
}
@@ -502,14 +473,7 @@ public class BookKeeper {
* optional control object
*/
public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
- }
- public void connectionFailed(int code) {
- cb.deleteComplete(code, ctx);
- }
- });
+ new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
}
Modified: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Tue Jun 5 06:21:07 2012
@@ -30,6 +30,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -127,18 +129,25 @@ public class BookKeeperAdmin {
*/
public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
// Create the ZooKeeper client instance
+ final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
@Override
public void process(WatchedEvent event) {
+ latch.countDown();
if (LOG.isDebugEnabled()) {
LOG.debug("Process: " + event.getType() + " " + event.getPath());
}
}
});
+ if (!latch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+ || !zk.getState().isConnected()) {
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
// Create the bookie path
bookiesPath = conf.getZkAvailableBookiesPath();
// Create the BookKeeper client instance
- bkc = new BookKeeper(conf);
+ bkc = new BookKeeper(conf, zk);
+
DIGEST_TYPE = conf.getBookieRecoveryDigestType();
PASSWD = conf.getBookieRecoveryPasswd();
}
Modified: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Tue Jun 5 06:21:07 2012
@@ -115,10 +115,10 @@ class BookieWatcher implements Watcher,
newBookieAddrs.add(bookieAddr);
}
- HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
- deadBookies.removeAll(newBookieAddrs);
-
+ final HashSet<InetSocketAddress> deadBookies;
synchronized (this) {
+ deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+ deadBookies.removeAll(newBookieAddrs);
knownBookies = newBookieAddrs;
}
Added: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java?rev=1346253&view=auto
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java (added)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java Tue Jun 5 06:21:07 2012
@@ -0,0 +1,88 @@
+package org.apache.bookkeeper.client;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests of the main BookKeeper client
+ */
+public class BookKeeperTest extends BookKeeperClusterTestCase {
+ public BookKeeperTest() {
+ super(4);
+ }
+
+ @Test
+ public void testConstructionZkDelay() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setZkTimeout(20000);
+
+ CountDownLatch l = new CountDownLatch(1);
+ zkUtil.sleepServer(5, l);
+ l.await();
+
+ BookKeeper bkc = new BookKeeper(conf);
+ bkc.createLedger(DigestType.CRC32, "testPasswd".getBytes()).close();
+ bkc.close();
+ }
+
+ @Test
+ public void testConstructionNotConnectedExplicitZk() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setZkServers(zkUtil.getZooKeeperConnectString())
+ .setZkTimeout(20000);
+
+ CountDownLatch l = new CountDownLatch(1);
+ zkUtil.sleepServer(5, l);
+ l.await();
+
+ ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ });
+ assertFalse("ZK shouldn't have connected yet", zk.getState().isConnected());
+ try {
+ BookKeeper bkc = new BookKeeper(conf, zk);
+ fail("Shouldn't be able to construct with unconnected zk");
+ } catch (KeeperException.ConnectionLossException cle) {
+ // correct behaviour
+ }
+ }
+}
\ No newline at end of file
Propchange: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java Tue Jun 5 06:21:07 2012
@@ -88,7 +88,7 @@ public class TestReadTimeout extends Boo
completed.set(true);
}
}, null);
- Thread.sleep((baseClientConf.getReadTimeout()*2)*1000);
+ Thread.sleep((baseClientConf.getReadTimeout()*3)*1000);
Assert.assertTrue("Write request did not finish", completed.get());
Set<InetSocketAddress> afterSet = new HashSet<InetSocketAddress>();
Modified: zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1346253&r1=1346252&r2=1346253&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Tue Jun 5 06:21:07 2012
@@ -22,6 +22,8 @@
package org.apache.bookkeeper.test;
import java.io.File;
+import java.io.IOException;
+
import java.net.InetSocketAddress;
import org.apache.commons.io.FileUtils;
@@ -115,6 +117,31 @@ public class ZooKeeperUtil {
zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
+ public void sleepServer(final int seconds, final CountDownLatch l)
+ throws InterruptedException, IOException {
+ Thread[] allthreads = new Thread[Thread.activeCount()];
+ Thread.enumerate(allthreads);
+ for (final Thread t : allthreads) {
+ if (t.getName().contains("SyncThread:0")) {
+ Thread sleeper = new Thread() {
+ public void run() {
+ try {
+ t.suspend();
+ l.countDown();
+ Thread.sleep(seconds*1000);
+ t.resume();
+ } catch (Exception e) {
+ LOG.error("Error suspending thread", e);
+ }
+ }
+ };
+ sleeper.start();
+ return;
+ }
+ }
+ throw new IOException("ZooKeeper thread not found");
+ }
+
public void killServer() throws Exception {
if (zkc != null) {
zkc.close();