You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/06/07 08:37:47 UTC
svn commit: r1347417 - in /zookeeper/bookkeeper/tags/release-4.1.0: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/uti...
Author: sijie
Date: Thu Jun 7 06:37:46 2012
New Revision: 1347417
URL: http://svn.apache.org/viewvc?rev=1347417&view=rev
Log:
svn merge -c 1346253,1346258,1346328,1346343,1346976 https://svn.apache.org/repos/asf/zookeeper/bookkeeper/branches/branch-4.1
Added:
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
- copied unchanged from r1346253, zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
Modified:
zookeeper/bookkeeper/tags/release-4.1.0/ (props changed)
zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
zookeeper/bookkeeper/tags/release-4.1.0/pom.xml
Propchange: zookeeper/bookkeeper/tags/release-4.1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Jun 7 06:37:46 2012
@@ -0,0 +1 @@
+/zookeeper/bookkeeper/branches/branch-4.1:1346253,1346258,1346328,1346343,1346976
Modified: zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt Thu Jun 7 06:37:46 2012
@@ -120,6 +120,14 @@ Release 4.1.0 - 2012-05-31
BOOKKEEPER-273: LedgerHandle.deleteLedger() should be idempotent (Matteo Merli via ivank)
+ BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)
+
+ BOOKKEEPER-279: LocalBookKeeper is failing intermittently due to zkclient connection establishment delay (Rakesh R via sijie)
+
+ BOOKKEEPER-286: Compilation warning (ivank via sijie)
+
+ BOOKKEEPER-287: NoSuchElementException in LedgerCacheImpl (sijie)
+
hedwig-client/
BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)
@@ -146,6 +154,8 @@ Release 4.1.0 - 2012-05-31
BOOKKEEPER-146: TestConcurrentTopicAcquisition sometimes hangs (ivank)
+ BOOKKEEPER-285: TestZkSubscriptionManager quits due to NPE, so other tests are not run in hedwig server. (sijie)
+
bookkeeper-benchmark/
BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj)
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Thu Jun 7 06:37:46 2012
@@ -176,7 +176,7 @@ public class LedgerCacheImpl implements
}
} catch (IOException ie) {
// if we grab a clean page, but failed to update the page
- // we are exhuasting the count of ledger entry pages.
+ // we are exhausting the count of ledger entry pages.
// since this page will be never used, so we need to decrement
// page count of ledger cache.
lep.releasePage();
@@ -317,7 +317,7 @@ public class LedgerCacheImpl implements
if (!doAll) {
break;
}
- // Yeild. if we are doing all the ledgers we don't want to block other flushes that
+ // Yield. if we are doing all the ledgers we don't want to block other flushes that
// need to happen
try {
dirtyLedgers.wait(1);
@@ -449,22 +449,22 @@ public class LedgerCacheImpl implements
if (entry % entriesPerPage != 0) {
throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
}
- synchronized(this) {
- if (pageCount < pageLimit) {
- // let's see if we can allocate something
- LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
- lep.setLedger(ledger);
- lep.setFirstEntry(entry);
-
- // note, this will not block since it is a new page
- lep.usePage();
- pageCount++;
- return lep;
- }
- }
-
outerLoop:
while(true) {
+ synchronized(this) {
+ if (pageCount < pageLimit) {
+ // let's see if we can allocate something
+ LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+ lep.setLedger(ledger);
+ lep.setFirstEntry(entry);
+
+ // note, this will not block since it is a new page
+ lep.usePage();
+ pageCount++;
+ return lep;
+ }
+ }
+
synchronized(cleanLedgers) {
if (cleanLedgers.isEmpty()) {
flushLedger(false);
@@ -475,6 +475,14 @@ public class LedgerCacheImpl implements
}
}
synchronized(this) {
+ // if ledgers deleted between checking pageCount and putting
+ // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
+ // so give it a chance to go back to check pageCount again because
+ // deleteLedger would decrement pageCount to return the number of pages
+ // occupied by deleted ledgers.
+ if (cleanLedgers.isEmpty()) {
+ continue outerLoop;
+ }
Long cleanLedger = cleanLedgers.getFirst();
Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
while (map == null || map.isEmpty()) {
@@ -610,7 +618,13 @@ public class LedgerCacheImpl implements
// remove pages first to avoid page flushed when deleting file info
synchronized(this) {
- pages.remove(ledgerId);
+ Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
+ if (null != lpages) {
+ pageCount -= lpages.size();
+ if (pageCount < 0) {
+ LOG.error("Page count of ledger cache has been decremented to be less than zero.");
+ }
+ }
}
// Delete the ledger's index file and close the FileInfo
FileInfo fi = null;
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Jun 7 06:37:46 2012
@@ -119,27 +119,35 @@ public class BookKeeper {
* @throws InterruptedException
* @throws KeeperException
*/
- public BookKeeper(ClientConfiguration conf)
+ public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, KeeperException {
this.conf = conf;
+
+ final CountDownLatch zkConnectLatch = new CountDownLatch(1);
this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
new Watcher() {
@Override
public void process(WatchedEvent event) {
- if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
- connectLatch.countDown();
- }
+ // countdown the latch on all events, even if we haven't
+ // successfully connected.
+ zkConnectLatch.countDown();
+
// TODO: handle session disconnects and expires
LOG.debug("Process: {} {}", event.getType(), event.getPath());
}
});
+ if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+ || !zk.getState().isConnected()) {
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
+
this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
- bookieWatcher = new BookieWatcher(conf, this);
- bookieWatcher.readBookiesBlocking();
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- // initialize ledger meta manager
+ bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher.readBookiesBlocking();
+
ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
ownChannelFactory = true;
@@ -176,49 +184,33 @@ public class BookKeeper {
* {@link ClientConfiguration}
* @param zk
* Zookeeper client instance connected to the zookeeper with which
- * the bookies have registered
+ * the bookies have registered. The ZooKeeper client must be connected
+ * before it is passed to BookKeeper. Otherwise a KeeperException is thrown.
* @param channelFactory
* A factory that will be used to create connections to the bookies
* @throws IOException
* @throws InterruptedException
- * @throws KeeperException
+ * @throws KeeperException if the passed zk handle is not connected
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
throws IOException, InterruptedException, KeeperException {
if (zk == null || channelFactory == null) {
throw new NullPointerException();
}
+ if (!zk.getState().isConnected()) {
+ LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
this.conf = conf;
this.zk = zk;
this.channelFactory = channelFactory;
- bookieWatcher = new BookieWatcher(conf, this);
- bookieWatcher.readBookiesBlocking();
+
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- // initialize ledger meta manager
- ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
- }
+ bookieWatcher = new BookieWatcher(conf, this);
+ bookieWatcher.readBookiesBlocking();
- void withZKConnected(final ZKConnectCallback cb) {
- if (ownZKHandle) {
- mainWorkerPool.submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- try {
- if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) {
- cb.connectionFailed(BKException.Code.ZKException);
- } else {
- cb.connected();
- }
- } catch (InterruptedException ie) {
- // someone trying to kill the process
- cb.connectionFailed(BKException.Code.InterruptedException);
- }
- }
- });
- } else {
- cb.connected();
- }
+ ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
}
LedgerManager getLedgerManager() {
@@ -278,15 +270,8 @@ public class BookKeeper {
*/
public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
final byte[] passwd, final CreateCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
- .initiate();
- }
- public void connectionFailed(int code) {
- cb.createComplete(code, null, ctx);
- }
- });
+ new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+ .initiate();
}
@@ -370,14 +355,7 @@ public class BookKeeper {
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
final OpenCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
- }
- public void connectionFailed(int code) {
- cb.openComplete(code, null, ctx);
- }
- });
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
}
/**
@@ -409,14 +387,7 @@ public class BookKeeper {
*/
public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
final OpenCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
- }
- public void connectionFailed(int code) {
- cb.openComplete(code, null, ctx);
- }
- });
+ new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
}
@@ -502,14 +473,7 @@ public class BookKeeper {
* optional control object
*/
public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
- withZKConnected(new ZKConnectCallback() {
- public void connected() {
- new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
- }
- public void connectionFailed(int code) {
- cb.deleteComplete(code, ctx);
- }
- });
+ new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
}
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Thu Jun 7 06:37:46 2012
@@ -30,6 +30,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -127,18 +129,25 @@ public class BookKeeperAdmin {
*/
public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
// Create the ZooKeeper client instance
+ final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
@Override
public void process(WatchedEvent event) {
+ latch.countDown();
if (LOG.isDebugEnabled()) {
LOG.debug("Process: " + event.getType() + " " + event.getPath());
}
}
});
+ if (!latch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+ || !zk.getState().isConnected()) {
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
// Create the bookie path
bookiesPath = conf.getZkAvailableBookiesPath();
// Create the BookKeeper client instance
- bkc = new BookKeeper(conf);
+ bkc = new BookKeeper(conf, zk);
+
DIGEST_TYPE = conf.getBookieRecoveryDigestType();
PASSWD = conf.getBookieRecoveryPasswd();
}
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Thu Jun 7 06:37:46 2012
@@ -115,10 +115,10 @@ class BookieWatcher implements Watcher,
newBookieAddrs.add(bookieAddr);
}
- HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
- deadBookies.removeAll(newBookieAddrs);
-
+ final HashSet<InetSocketAddress> deadBookies;
synchronized (this) {
+ deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+ deadBookies.removeAll(newBookieAddrs);
knownBookies = newBookieAddrs;
}
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java Thu Jun 7 06:37:46 2012
@@ -26,23 +26,20 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -68,6 +65,7 @@ public class LocalBookKeeper {
ZooKeeperServer zks;
ZooKeeper zkc;
int ZooKeeperDefaultPort = 2181;
+ static int zkSessionTimeOut = 5000;
File ZkTmpDir;
//BookKeeper variables
@@ -104,11 +102,14 @@ public class LocalBookKeeper {
LOG.debug("ZooKeeper server up: " + b);
}
- private void initializeZookeper() {
+ private void initializeZookeper() throws IOException {
LOG.info("Instantiate ZK Client");
//initialize the zk client with values
try {
- zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+ ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher();
+ zkc = new ZooKeeper(HOSTPORT, zkSessionTimeOut,
+ zkConnectionWatcher);
+ zkConnectionWatcher.waitForConnection();
zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// No need to create an entry for each requested bookie anymore as the
@@ -119,9 +120,6 @@ public class LocalBookKeeper {
} catch (InterruptedException e) {
// TODO Auto-generated catch block
LOG.error("Interrupted while creating znodes", e);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- LOG.error("Exception while creating znodes", e);
}
}
private void runBookies(ServerConfiguration baseConf)
@@ -184,9 +182,30 @@ public class LocalBookKeeper {
System.err.println("Usage: LocalBookKeeper number-of-bookies");
}
- /* User for testing purposes, void */
- static class emptyWatcher implements Watcher {
- public void process(WatchedEvent event) {}
+ /* Watching SyncConnected event from ZooKeeper */
+ static class ZKConnectionWatcher implements Watcher {
+ private CountDownLatch clientConnectLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ clientConnectLatch.countDown();
+ }
+ }
+
+ // Waiting for the SyncConnected event from the ZooKeeper server
+ public void waitForConnection() throws IOException {
+ try {
+ if (!clientConnectLatch.await(zkSessionTimeOut,
+ TimeUnit.MILLISECONDS)) {
+ throw new IOException(
+ "Couldn't connect to zookeeper server");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Interrupted when connecting to zookeeper server", e);
+ }
+ }
}
public static boolean waitForServerUp(String hp, long timeout) {
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Thu Jun 7 06:37:46 2012
@@ -126,6 +126,41 @@ public class LedgerCacheTest extends Tes
}
@Test
+ public void testDeleteLedger() throws Exception {
+ int numEntries = 10;
+ // limit open files & pages
+ conf.setOpenFileLimit(999).setPageLimit(2)
+ .setPageSize(8 * numEntries);
+ // create ledger cache
+ newLedgerCache();
+ try {
+ int numLedgers = 2;
+ byte[] masterKey = "blah".getBytes();
+ for (int i=1; i<=numLedgers; i++) {
+ ledgerCache.setMasterKey((long)i, masterKey);
+ for (int j=0; j<numEntries; j++) {
+ ledgerCache.putEntryOffset(i, j, i*numEntries + j);
+ }
+ }
+ // ledger cache is exhausted
+ // delete ledgers
+ for (int i=1; i<=numLedgers; i++) {
+ ledgerCache.deleteLedger((long)i);
+ }
+ // create num ledgers to add entries
+ for (int i=numLedgers+1; i<=2*numLedgers; i++) {
+ ledgerCache.setMasterKey((long)i, masterKey);
+ for (int j=0; j<numEntries; j++) {
+ ledgerCache.putEntryOffset(i, j, i*numEntries + j);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Got Exception.", e);
+ fail("Failed to add entry.");
+ }
+ }
+
+ @Test
public void testPageEviction() throws Exception {
int numLedgers = 10;
byte[] masterKey = "blah".getBytes();
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java Thu Jun 7 06:37:46 2012
@@ -88,7 +88,7 @@ public class TestReadTimeout extends Boo
completed.set(true);
}
}, null);
- Thread.sleep((baseClientConf.getReadTimeout()*2)*1000);
+ Thread.sleep((baseClientConf.getReadTimeout()*3)*1000);
Assert.assertTrue("Write request did not finish", completed.get());
Set<InetSocketAddress> afterSet = new HashSet<InetSocketAddress>();
Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Thu Jun 7 06:37:46 2012
@@ -22,6 +22,8 @@
package org.apache.bookkeeper.test;
import java.io.File;
+import java.io.IOException;
+
import java.net.InetSocketAddress;
import org.apache.commons.io.FileUtils;
@@ -115,6 +117,31 @@ public class ZooKeeperUtil {
zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
+ public void sleepServer(final int seconds, final CountDownLatch l)
+ throws InterruptedException, IOException {
+ Thread[] allthreads = new Thread[Thread.activeCount()];
+ Thread.enumerate(allthreads);
+ for (final Thread t : allthreads) {
+ if (t.getName().contains("SyncThread:0")) {
+ Thread sleeper = new Thread() {
+ public void run() {
+ try {
+ t.suspend();
+ l.countDown();
+ Thread.sleep(seconds*1000);
+ t.resume();
+ } catch (Exception e) {
+ LOG.error("Error suspending thread", e);
+ }
+ }
+ };
+ sleeper.start();
+ return;
+ }
+ }
+ throw new IOException("ZooKeeper thread not found");
+ }
+
public void killServer() throws Exception {
if (zkc != null) {
zkc.close();
Modified: zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Thu Jun 7 06:37:46 2012
@@ -31,6 +31,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.Callback;
@@ -50,7 +51,8 @@ public class TestZkSubscriptionManager e
super.setUp();
cfg = new ServerConfiguration();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), null, cfg, scheduler);
+ sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler),
+ LocalDBPersistenceManager.instance(), cfg, scheduler);
msgIdCallback = new Callback<MessageSeqId>() {
@Override
public void operationFailed(Object ctx, final PubSubException exception) {
Modified: zookeeper/bookkeeper/tags/release-4.1.0/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/pom.xml?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/pom.xml (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/pom.xml Thu Jun 7 06:37:46 2012
@@ -45,6 +45,11 @@
<build>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.3.2</version>
+ </plugin>
+ <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
@@ -122,11 +127,6 @@
<version>2.1</version>
</plugin>
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <version>2.1</version>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>2.3</version>