You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/05/09 09:43:50 UTC

svn commit: r1335958 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ hedwig-client/src/main/java/org/apache/hedwig/conf/ hedwig-server/src/m...

Author: ivank
Date: Wed May  9 07:43:50 2012
New Revision: 1335958

URL: http://svn.apache.org/viewvc?rev=1335958&view=rev
Log:
BOOKKEEPER-215: Deadlock occurs under high load (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed May  9 07:43:50 2012
@@ -114,6 +114,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-56: Race condition of message handler in connection recovery in Hedwig client (sijie & Gavin Li via ivank)
 
+        BOOKKEEPER-215: Deadlock occurs under high load (sijie via ivank)
+
       bookkeeper-benchmark/
 	
 	BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Wed May  9 07:43:50 2012
@@ -78,8 +78,6 @@ public class BookKeeper {
     final BookieClient bookieClient;
     final BookieWatcher bookieWatcher;
 
-    // used to call bookkeeper op in callback
-    final OrderedSafeExecutor callbackWorker;
     final OrderedSafeExecutor mainWorkerPool;
 
     // Ledger manager responsible for how to store ledger meta data
@@ -140,7 +138,6 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(this);
         bookieWatcher.readBookiesBlocking();
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
-        callbackWorker = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
         // initialize ledger meta manager
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
@@ -197,7 +194,6 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(this);
         bookieWatcher.readBookiesBlocking();
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
-        callbackWorker = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
         // initialize ledger meta manager
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
@@ -553,7 +549,6 @@ public class BookKeeper {
         if (ownZKHandle) {
             zk.close();
         }
-        callbackWorker.shutdown();
         mainWorkerPool.shutdown();
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Wed May  9 07:43:50 2012
@@ -72,11 +72,16 @@ public class BookieServer implements NIO
     public BookieServer(ServerConfiguration conf) 
             throws IOException, KeeperException, InterruptedException, BookieException {
         this.conf = conf;
-        this.bookie = new Bookie(conf);
+        this.bookie = newBookie(conf);
 
         isStatsEnabled = conf.isStatisticsEnabled();
     }
 
+    protected Bookie newBookie(ServerConfiguration conf)
+        throws IOException, KeeperException, InterruptedException, BookieException {
+        return new Bookie(conf);
+    }
+
     public void start() throws IOException {
         this.bookie.start();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java Wed May  9 07:43:50 2012
@@ -51,4 +51,13 @@ public abstract class AbstractConfigurat
         conf.addConfiguration(loadedConf);
 
     }
+
+    /**
+     * Add configuration object.
+     *
+     * @param conf configuration object
+     */
+    public void addConf(Configuration otherConf) throws ConfigurationException {
+        conf.addConfiguration(otherConf);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Wed May  9 07:43:50 2012
@@ -170,10 +170,16 @@ public class BookkeeperPersistenceManage
         int numMessagesRead = 0;
         long totalSizeRead = 0;
         TopicInfo topicInfo;
+        long startSeqIdToScan;
 
         public RangeScanOp(RangeScanRequest request) {
+            this(request, -1L);
+        }
+
+        public RangeScanOp(RangeScanRequest request, long startSeqId) {
             queuer.super(request.topic);
             this.request = request;
+            this.startSeqIdToScan = startSeqId;
         }
 
         @Override
@@ -185,8 +191,9 @@ public class BookkeeperPersistenceManage
                 return;
             }
 
-            startReadingFrom(request.startSeqId);
-
+            // if startSeqIdToScan is less than zero, which means it is an unfinished scan request
+            // we continue the scan from the provided position
+            startReadingFrom(startSeqIdToScan < 0 ? request.startSeqId : startSeqIdToScan);
         }
 
         protected void read(final InMemoryLedgerRange imlr, final long startSeqId, final long endSeqId) {
@@ -275,8 +282,8 @@ public class BookkeeperPersistenceManage
                         }
                     }
 
-                    startReadingFrom(imlr.startSeqIdIncluded + entry.getEntryId() + 1);
-
+                    // continue scanning messages
+                    scanMessages(request, imlr.startSeqIdIncluded + entry.getEntryId() + 1);
                 }
             }, request.ctx);
         }
@@ -310,6 +317,10 @@ public class BookkeeperPersistenceManage
         queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request));
     }
 
+    protected void scanMessages(RangeScanRequest request, long scanSeqId) {
+        queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request, scanSeqId));
+    }
+
     public void deliveredUntil(ByteString topic, Long seqId) {
         // Nothing to do here. this is just a hint that we cannot use.
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Wed May  9 07:43:50 2012
@@ -44,6 +44,7 @@ public abstract class HedwigHubTestBase 
     // Default number of bookie servers to setup. Extending classes can
     // override this.
     protected int numBookies = 3;
+    protected long readDelay = 0L;
     protected BookKeeperTestBase bktb;
 
     // PubSubServer variables
@@ -132,7 +133,7 @@ public abstract class HedwigHubTestBase 
     @Before
     public void setUp() throws Exception {
         logger.info("STARTING " + getName());
-        bktb = new BookKeeperTestBase(numBookies);
+        bktb = new BookKeeperTestBase(numBookies, readDelay);
         bktb.setUp();
         startHubServers();
         logger.info("HedwigHub test setup finished");

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1335958&r1=1335957&r2=1335958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Wed May  9 07:43:50 2012
@@ -18,11 +18,15 @@
 package org.apache.hedwig.server.persistence;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.io.File;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -48,6 +52,40 @@ import org.slf4j.LoggerFactory;
 public class BookKeeperTestBase extends ZooKeeperTestBase {
     private static Logger LOG = LoggerFactory.getLogger(BookKeeperTestBase.class);
 
+    class TestBookie extends Bookie {
+        final long readDelay;
+
+        public TestBookie(ServerConfiguration conf, long readDelay)
+            throws IOException, KeeperException, InterruptedException, BookieException {
+            super(conf);
+            this.readDelay = readDelay;
+        }
+
+        @Override
+        public ByteBuffer readEntry(long ledgerId, long entryId)
+            throws IOException, NoLedgerException {
+            if (readDelay > 0) {
+                try {
+                    Thread.sleep(readDelay);
+                } catch (InterruptedException ie) {
+                }
+            }
+            return super.readEntry(ledgerId, entryId);
+        }
+    }
+
+    class TestBookieServer extends BookieServer {
+        public TestBookieServer(ServerConfiguration conf)
+            throws IOException, KeeperException, InterruptedException, BookieException {
+            super(conf);
+        }
+
+        protected Bookie newBookie(ServerConfiguration conf)
+            throws IOException, KeeperException, InterruptedException, BookieException {
+            return new TestBookie(conf, readDelay);
+        }
+    }
+
     // BookKeeper Server variables
     private List<BookieServer> bookiesList;
     private List<ServerConfiguration> bkConfsList;
@@ -58,6 +96,9 @@ public class BookKeeperTestBase extends 
     private static final String PREFIX = "bookie";
     private static final String SUFFIX = "test";
 
+    // readDelay
+    protected long readDelay;
+
     // Variable to decide how many bookie servers to set up.
     private final int numBookies;
     // BookKeeper client instance
@@ -68,7 +109,12 @@ public class BookKeeperTestBase extends 
 
     // Constructor
     public BookKeeperTestBase(int numBookies) {
+        this(numBookies, 0L);
+    }
+
+    public BookKeeperTestBase(int numBookies, long readDelay) {
         this.numBookies = numBookies;
+        this.readDelay = readDelay;
     }
 
     public BookKeeperTestBase() {
@@ -168,7 +214,7 @@ public class BookKeeperTestBase extends 
      *
      */
     private BookieServer startBookie(ServerConfiguration conf) throws Exception {
-        BookieServer server = new BookieServer(conf);
+        BookieServer server = new TestBookieServer(conf);
         server.start();
 
         int port = conf.getBookiePort();

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java?rev=1335958&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java Wed May  9 07:43:50 2012
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public class TestDeadlock extends HedwigHubTestBase {
+
+    protected static Logger logger = LoggerFactory.getLogger(TestDeadlock.class);
+
+    // Client side variables
+    protected HedwigClient client;
+    protected Publisher publisher;
+    protected Subscriber subscriber;
+
+    ByteString topic = ByteString.copyFromUtf8("DeadLockTopic");
+    ByteString subscriberId = ByteString.copyFromUtf8("dl");
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numServers = 1;
+        numBookies = 1;
+        readDelay = 1000L; // 1s
+        super.setUp();
+        client = new HedwigClient(new ClientConfiguration());
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        client.close();
+        super.tearDown();
+    }
+
+    // Test implementation of Callback for async client actions.
+    static class TestCallback implements Callback<Void> {
+        private final SynchronousQueue<Boolean> queue;
+
+        public TestCallback(SynchronousQueue<Boolean> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void operationFinished(Object ctx, Void resultOfOperation) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Operation finished!");
+                    ConcurrencyUtils.put(queue, true);
+                }
+            }).start();
+        }
+
+        @Override
+        public void operationFailed(Object ctx, final PubSubException exception) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    logger.error("Operation failed!", exception);
+                    ConcurrencyUtils.put(queue, false);
+                }
+            }).start();
+        }
+    }
+
+    // Test implementation of subscriber's message handler.
+    class TestMessageHandler implements MessageHandler {
+        private final SynchronousQueue<Boolean> consumeQueue;
+        boolean doAdd = false;
+
+        public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
+            this.consumeQueue = consumeQueue;
+        }
+
+        public void deliver(ByteString t, ByteString sub, final Message msg, Callback<Void> callback,
+                            Object context) {
+            if (!doAdd) {
+                // after receiving first message, we send a publish
+                // to obtain permit of second ledger
+                doAdd = true;
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        // publish messages again to obtain permits
+                        logger.info("Start publishing message to obtain permit");
+                        // it obtains the permit and wait for a response,
+                        // but the response is delayed and readEntries is called
+                        // in the readComplete callback to read entries of the
+                        // same ledger. since there is no permit, it blocks
+                        try {
+                            CountDownLatch latch = new CountDownLatch(1);
+                            sleepBookie(8, latch);
+                            latch.await();
+                            SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+                            for (int i=0; i<3; i++) {
+                                publisher.asyncPublish(topic, getMsg(9999), new TestCallback(queue), null);
+                            }
+                            for (int i=0; i<3; i++) {
+                                assertTrue(queue.take());
+                            }
+                        } catch (Exception e) {
+                            logger.error("Failed to publish message to obtain permit.");
+                        }
+                    }
+                }).start();
+            }
+
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConcurrencyUtils.put(consumeQueue, true);
+                }
+            }).start();
+            callback.operationFinished(context, null);
+        }
+    }
+
+    // Helper function to generate Messages
+    protected Message getMsg(int msgNum) {
+        return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
+    }
+
+    // Helper function to generate Topics
+    protected ByteString getTopic(int topicNum) {
+        return ByteString.copyFromUtf8("DeadLockTopic" + topicNum);
+    }
+
+    class TestServerConfiguration extends HubServerConfiguration {
+        public TestServerConfiguration(int serverPort, int sslServerPort) {
+            super(serverPort, sslServerPort);
+        }
+        @Override
+        public int getBkEnsembleSize() {
+            return 1;
+        }
+        @Override
+        public int getBkQuorumSize() {
+            return 1;
+        }
+        @Override
+        public int getReadAheadCount() {
+            return 4;
+        }
+        @Override
+        public long getMaximumCacheSize() {
+            return 32;
+        }
+    }
+
+    @Override
+    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
+        ServerConfiguration serverConf = new TestServerConfiguration(serverPort, sslServerPort);
+
+        org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
+            new org.apache.bookkeeper.conf.ClientConfiguration();
+        bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
+                    .setThrottleValue(3);
+        try {
+            serverConf.addConf(bkClientConf);
+        } catch (Exception e) {
+        }
+        return serverConf;
+    }
+
+    @Test
+    public void testDeadlock() throws Exception {
+        int numMessages = 5;
+
+        SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+
+        // subscribe to topic
+        logger.info("Setup subscriptions");
+        subscriber.subscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.closeSubscription(topic, subscriberId);
+
+        // publish 5 messages to form first ledger
+        for (int i=0; i<numMessages; i++) {
+            logger.info("Start publishing message {}", i);
+            publisher.publish(topic, getMsg(i));
+        }
+
+        stopHubServers();
+        Thread.sleep(1000);
+        startHubServers();
+
+        logger.info("Start publishing messages");
+        // publish enough messages to second ledger
+        // so a scan request need to scan over two ledgers, which
+        // cause readEntries executed in the previous readEntries
+        for (int i=0; i<numMessages; i++) {
+            logger.info("Start publishing message {}", i+5);
+            publisher.publish(topic, getMsg(i));
+        }
+
+        logger.info("Start subscribe topics again and receive messages");
+        // subscribe to topic
+        subscriber.subscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.startDelivery(topic, subscriberId,
+                                 new TestMessageHandler(consumeQueue));
+        for (int i=0; i<(2*numMessages+3); i++) {
+            assertTrue(consumeQueue.take());
+        }
+    }
+
+    protected void sleepBookie(final int seconds, final CountDownLatch l)
+    throws InterruptedException, IOException {
+        final String prefix = "Bookie-";
+        Thread[] allThreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allThreads);
+        for (final Thread t : allThreads) {
+            if (t.getName().startsWith(prefix)) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            Thread.sleep(seconds * 1000);
+                            t.resume();
+                        } catch (Exception e) {
+                            logger.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("Bookie thread not found");
+    }
+}