You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:13 UTC

[03/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
deleted file mode 100644
index 08f287c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class MessageBoundedPersistenceTest extends HedwigHubTestBase {
-    protected static final Logger logger = LoggerFactory.getLogger(MessageBoundedPersistenceTest.class);
-
-    protected class SmallReadAheadServerConfiguration
-        extends HedwigHubTestBase.HubServerConfiguration {
-        SmallReadAheadServerConfiguration(int serverPort, int sslServerPort) {
-            super(serverPort, sslServerPort);
-        }
-        public long getMaximumCacheSize() {
-            return 1;
-        }
-
-        public int getReadAheadCount() {
-            return 1;
-        }
-
-        public int getMessagesConsumedThreadRunInterval() {
-            return 1000; // run every second
-        }
-    }
-
-    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
-        return new SmallReadAheadServerConfiguration(serverPort, sslServerPort);
-    }
-
-    private class MessageBoundClientConfiguration extends HubClientConfiguration {
-        final int messageBound;
-
-        public MessageBoundClientConfiguration(int bound) {
-            this.messageBound = bound;
-        }
-
-        public MessageBoundClientConfiguration() {
-            this(5);
-        }
-
-        public int getSubscriptionMessageBound() {
-            return messageBound;
-        }
-    }
-
-    private void sendXExpectLastY(Publisher pub, Subscriber sub,
-                                  ByteString topic, ByteString subid,
-                                  final int X, final int Y) throws Exception {
-        for (int i = 0; i < X; i++) {
-            pub.publish(topic, Message.newBuilder().setBody(
-                                ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.ATTACH).build();
-        sub.subscribe(topic, subid, opts);
-
-        final AtomicInteger expected = new AtomicInteger(X - Y);
-        final CountDownLatch latch = new CountDownLatch(1);
-        sub.startDelivery(topic, subid, new MessageHandler () {
-                synchronized public void deliver(ByteString topic, ByteString subscriberId,
-                                    Message msg, Callback<Void> callback,
-                                    Object context) {
-                    try {
-                        int value = Integer.valueOf(msg.getBody().toStringUtf8());
-
-                        if (value == expected.get()) {
-                            expected.incrementAndGet();
-                        } else {
-                            // error condition
-                            logger.error("Did not receive expected value, expected {}, got {}",
-                                         expected.get(), value);
-                            expected.set(0);
-                            latch.countDown();
-                        }
-                        if (expected.get() == X) {
-                            latch.countDown();
-                        }
-                        callback.operationFinished(context, null);
-                    } catch (Exception e) {
-                        logger.error("Received bad message", e);
-                        latch.countDown();// will error on match
-                    }
-                }
-            });
-        assertTrue("Timed out waiting for messages Y is " + Y
-                + " expected is currently " + expected.get(), latch.await(10, TimeUnit.SECONDS));
-        assertEquals("Should be expected message with " + X, X, expected.get());
-
-        sub.stopDelivery(topic, subid);
-        sub.closeSubscription(topic, subid);
-    }
-
-    @Test(timeout=60000)
-    public void testBasicBounding() throws Exception {
-        Client client = new HedwigClient(new MessageBoundClientConfiguration(5));
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        ByteString topic = ByteString.copyFromUtf8("basicBoundingTopic");
-        ByteString subid = ByteString.copyFromUtf8("basicBoundingSubId");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-        sub.subscribe(topic, subid, opts);
-        sub.closeSubscription(topic, subid);
-
-        sendXExpectLastY(pub, sub, topic, subid, 1000, 5);
-
-        client.close();
-    }
-
-    @Test(timeout=60000)
-    public void testMultipleSubscribers() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("multiSubTopic");
-
-        Client client = new HedwigClient(new HubClientConfiguration());
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build();
-        SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(20).build();
-        SubscriptionOptions optionsUnbounded = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
-        ByteString subid5 = ByteString.copyFromUtf8("bound5SubId");
-        ByteString subid20 = ByteString.copyFromUtf8("bound20SubId");
-        ByteString subidUnbounded = ByteString.copyFromUtf8("noboundSubId");
-
-        sub.subscribe(topic, subid5, options5);
-        sub.closeSubscription(topic, subid5);
-        sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
-
-        sub.subscribe(topic, subid20, options20);
-        sub.closeSubscription(topic, subid20);
-        sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
-
-        sub.subscribe(topic, subidUnbounded, optionsUnbounded);
-        sub.closeSubscription(topic, subidUnbounded);
-
-        sendXExpectLastY(pub, sub, topic, subidUnbounded, 10000, 10000);
-        sub.unsubscribe(topic, subidUnbounded);
-
-        sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
-        sub.unsubscribe(topic, subid20);
-
-        sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
-        sub.unsubscribe(topic, subid5);
-
-        client.close();
-    }
-
-    @Test(timeout=60000)
-    public void testUpdateMessageBound() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("UpdateMessageBound");
-
-        Client client = new HedwigClient(new HubClientConfiguration());
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(5).build();
-        SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(20).build();
-        SubscriptionOptions options10 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(10).build();
-
-        ByteString subid = ByteString.copyFromUtf8("updateSubId");
-
-        sub.subscribe(topic, subid, options5);
-        sub.closeSubscription(topic, subid);
-        sendXExpectLastY(pub, sub, topic, subid, 50, 5);
-
-        // update bound to 20
-        sub.subscribe(topic, subid, options20);
-        sub.closeSubscription(topic, subid);
-        sendXExpectLastY(pub, sub, topic, subid, 50, 20);
-
-        // update bound to 10
-        sub.subscribe(topic, subid, options10);
-        sub.closeSubscription(topic, subid);
-        sendXExpectLastY(pub, sub, topic, subid, 50, 10);
-
-        // message bound is not provided, no update
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        sub.subscribe(topic, subid, opts);
-        sub.closeSubscription(topic, subid);
-        sendXExpectLastY(pub, sub, topic, subid, 50, 10);
-
-        client.close();
-    }
-
-    @Test(timeout=60000)
-    public void testLedgerGC() throws Exception {
-        Client client = new HedwigClient(new MessageBoundClientConfiguration());
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        String ledgersPath = "/hedwig/standalone/topics/testGCTopic/ledgers";
-        ByteString topic = ByteString.copyFromUtf8("testGCTopic");
-        ByteString subid = ByteString.copyFromUtf8("testGCSubId");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        sub.subscribe(topic, subid, opts);
-        sub.closeSubscription(topic, subid);
-
-        for (int i = 1; i <= 100; i++) {
-            pub.publish(topic, Message.newBuilder().setBody(
-                                ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-        LedgerRanges r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
-        assertEquals("Should only have 1 ledger yet", 1, r.getRangesList().size());
-        long firstLedger = r.getRangesList().get(0).getLedgerId();
-
-        stopHubServers();
-        startHubServers();
-
-        pub.publish(topic, Message.newBuilder().setBody(
-                            ByteString.copyFromUtf8(String.valueOf(0xdeadbeef))).build());
-
-        r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
-        assertEquals("Should have 2 ledgers after restart", 2, r.getRangesList().size());
-
-        for (int i = 100; i <= 200; i++) {
-            pub.publish(topic, Message.newBuilder().setBody(
-                                ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-        Thread.sleep(5000); // give GC a chance to happen
-
-        r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
-        long secondLedger = r.getRangesList().get(0).getLedgerId();
-
-        assertEquals("Should only have 1 ledger after GC", 1, r.getRangesList().size());
-
-        // ensure original ledger doesn't exist
-        String firstLedgerPath = String.format("/ledgers/L%010d", firstLedger);
-        String secondLedgerPath = String.format("/ledgers/L%010d", secondLedger);
-        assertNull("Ledger should not exist", bktb.getZooKeeperClient().exists(firstLedgerPath, false));
-        assertNotNull("Ledger should exist", bktb.getZooKeeperClient().exists(secondLedgerPath, false));
-
-        client.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
deleted file mode 100644
index 827677f..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
-
-public class StubPersistenceManager implements PersistenceManagerWithRangeScan {
-    Map<ByteString, List<Message>> messages = new HashMap<ByteString, List<Message>>();
-    boolean failure = false;
-    ServiceDownException exception = new ServiceDownException("Asked to fail");
-
-    public void deliveredUntil(ByteString topic, Long seqId) {
-        // noop
-    }
-
-    public void consumedUntil(ByteString topic, Long seqId) {
-        // noop
-    }
-
-    public void setMessageBound(ByteString topic, Integer bound) {
-        // noop
-    }
-
-    public void clearMessageBound(ByteString topic) {
-        // noop
-    }
-
-    public void consumeToBound(ByteString topic) {
-        // noop
-    }
-
-    protected static class ArrayListMessageFactory implements Factory<List<Message>> {
-        static ArrayListMessageFactory instance = new ArrayListMessageFactory();
-
-        public List<Message> newInstance() {
-            return new ArrayList<Message>();
-        }
-    }
-
-    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
-        long seqId = MapMethods.getAfterInsertingIfAbsent(messages, topic, ArrayListMessageFactory.instance).size();
-        return MessageSeqId.newBuilder().setLocalComponent(seqId).build();
-    }
-
-    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
-        return seqId + skipAmount;
-    }
-
-    public void persistMessage(PersistRequest request) {
-        if (failure) {
-            request.getCallback().operationFailed(request.getCtx(), exception);
-            return;
-        }
-
-        MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance);
-        request.getCallback().operationFinished(request.getCtx(), MessageIdUtils.mergeLocalSeqId(request.getMessage(),
-                (long) messages.get(request.getTopic()).size()).getMsgId());
-    }
-
-    public void scanSingleMessage(ScanRequest request) {
-        if (failure) {
-            request.getCallback().scanFailed(request.getCtx(), exception);
-            return;
-        }
-
-        long index = request.getStartSeqId() - 1;
-        List<Message> messageList = messages.get(request.getTopic());
-        if (index >= messageList.size()) {
-            request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
-            return;
-        }
-
-        Message msg = messageList.get((int) index);
-        Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, request.getStartSeqId());
-        request.getCallback().messageScanned(request.getCtx(), toDeliver);
-    }
-
-    public void scanMessages(RangeScanRequest request) {
-        if (failure) {
-            request.getCallback().scanFailed(request.getCtx(), exception);
-            return;
-        }
-
-        long totalSize = 0;
-        long startSeqId = request.getStartSeqId();
-        for (int i = 0; i < request.getMessageLimit(); i++) {
-            List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(),
-                                        ArrayListMessageFactory.instance);
-            if (startSeqId + i > messageList.size()) {
-                request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
-                return;
-            }
-            Message msg = messageList.get((int) startSeqId + i - 1);
-            Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, startSeqId + i);
-            request.getCallback().messageScanned(request.getCtx(), toDeliver);
-
-            totalSize += toDeliver.getBody().size();
-
-            if (totalSize > request.getSizeLimit()) {
-                request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.SIZE_LIMIT_EXCEEDED);
-                return;
-            }
-        }
-        request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
-
-    }
-
-    @Override
-    public void stop() {
-        // do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
deleted file mode 100644
index e9fbd08..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-
-public class StubScanCallback implements ScanCallback {
-
-    public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build();
-
-    LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>();
-
-    @Override
-    public void messageScanned(Object ctx, Message message) {
-        ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
-    }
-
-    @Override
-    public void scanFailed(Object ctx, Exception exception) {
-        ConcurrencyUtils.put(queue, Either.of((Message) null, exception));
-    }
-
-    @Override
-    public void scanFinished(Object ctx, ReasonForFinish reason) {
-        ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
deleted file mode 100644
index d65750b..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.SubscriptionDataManager;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.hedwig.server.meta.TopicPersistenceManager;
-import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBookKeeperPersistenceManager {
-    private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
-
-    BookKeeperTestBase bktb;
-    private final int numBookies = 3;
-    private final long readDelay = 2000L;
-    private final int maxEntriesPerLedger = 10;
-
-    ServerConfiguration conf;
-    ScheduledExecutorService scheduler;
-
-    TopicManager tm;
-    BookkeeperPersistenceManager manager;
-    PubSubException failureException = null;
-    TestMetadataManagerFactory metadataManagerFactory;
-    TopicPersistenceManager tpManager;
-    MMSubscriptionManager sm;
-
-    boolean removeStartSeqId;
-
-    static class TestMetadataManagerFactory extends MetadataManagerFactory {
-
-        final MetadataManagerFactory factory;
-        int serviceDownCount = 0;
-
-        TestMetadataManagerFactory(ServerConfiguration conf, ZooKeeper zk) throws Exception {
-            factory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
-        }
-
-        public void setServiceDownCount(int count) {
-            this.serviceDownCount = count;
-        }
-
-        @Override
-        public int getCurrentVersion() {
-            return factory.getCurrentVersion();
-        }
-
-        @Override
-        protected MetadataManagerFactory initialize(
-            ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
-            // do nothing
-            return factory;
-        }
-
-        @Override
-        public void shutdown() throws IOException {
-            factory.shutdown();
-        }
-
-        @Override
-        public Iterator<ByteString> getTopics() throws IOException {
-            return factory.getTopics();
-        }
-
-        @Override
-        public TopicPersistenceManager newTopicPersistenceManager() {
-            final TopicPersistenceManager manager = factory.newTopicPersistenceManager();
-            return new TopicPersistenceManager() {
-
-                @Override
-                public void close() throws IOException {
-                    manager.close();
-                }
-
-                @Override
-                public void readTopicPersistenceInfo(ByteString topic,
-                                                     Callback<Versioned<LedgerRanges>> callback, Object ctx) {
-                    if (serviceDownCount > 0) {
-                        --serviceDownCount;
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
-                        return;
-                    }
-                    manager.readTopicPersistenceInfo(topic, callback, ctx);
-                }
-                @Override
-                public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
-                                                      Callback<Version> callback, Object ctx) {
-                    if (serviceDownCount > 0) {
-                        --serviceDownCount;
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
-                        return;
-                    }
-                    manager.writeTopicPersistenceInfo(topic, ranges, version, callback, ctx);
-                }
-                @Override
-                public void deleteTopicPersistenceInfo(ByteString topic, Version version,
-                                                       Callback<Void> callback, Object ctx) {
-                    if (serviceDownCount > 0) {
-                        --serviceDownCount;
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
-                        return;
-                    }
-                    manager.deleteTopicPersistenceInfo(topic, version, callback, ctx);
-                }
-            };
-        }
-
-        @Override
-        public SubscriptionDataManager newSubscriptionDataManager() {
-            final SubscriptionDataManager sdm = factory.newSubscriptionDataManager();
-            return new SubscriptionDataManager() {
-                @Override
-                public void close() throws IOException {
-                    sdm.close();
-                }
-
-                @Override
-                public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
-                        Callback<Version> callback, Object ctx) {
-                    sdm.createSubscriptionData(topic, subscriberId, data, callback, ctx);
-                }
-
-                @Override
-                public boolean isPartialUpdateSupported() {
-                    return sdm.isPartialUpdateSupported();
-                }
-
-                @Override
-                public void updateSubscriptionData(ByteString topic, ByteString subscriberId,
-                        SubscriptionData dataToUpdate, Version version, Callback<Version> callback, Object ctx) {
-                    if (serviceDownCount > 0) {
-                        --serviceDownCount;
-                        callback.operationFailed(ctx,
-                                new PubSubException.ServiceDownException("Metadata Store is down"));
-                        return;
-                    }
-                    sdm.updateSubscriptionData(topic, subscriberId, dataToUpdate, version, callback, ctx);
-                }
-
-                @Override
-                public void replaceSubscriptionData(ByteString topic, ByteString subscriberId,
-                        SubscriptionData dataToReplace, Version version, Callback<Version> callback, Object ctx) {
-                    if (serviceDownCount > 0) {
-                        --serviceDownCount;
-                        callback.operationFailed(ctx,
-                                new PubSubException.ServiceDownException("Metadata Store is down"));
-                        return;
-                    }
-                    sdm.replaceSubscriptionData(topic, subscriberId, dataToReplace, version, callback, ctx);
-                }
-
-                @Override
-                public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version,
-                        Callback<Void> callback, Object ctx) {
-                    sdm.deleteSubscriptionData(topic, subscriberId, version, callback, ctx);
-                }
-
-                @Override
-                public void readSubscriptionData(ByteString topic, ByteString subscriberId,
-                        Callback<Versioned<SubscriptionData>> callback, Object ctx) {
-                    sdm.readSubscriptionData(topic, subscriberId, callback, ctx);
-                }
-
-                @Override
-                public void readSubscriptions(ByteString topic,
-                        Callback<Map<ByteString, Versioned<SubscriptionData>>> cb, Object ctx) {
-                    sdm.readSubscriptions(topic, cb, ctx);
-                }
-            };
-        }
-
-        @Override
-        public TopicOwnershipManager newTopicOwnershipManager() {
-            return factory.newTopicOwnershipManager();
-        }
-
-        @Override
-        public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
-            factory.format(cfg, zk);
-        }
-    }
-
-    public TestBookKeeperPersistenceManager(boolean removeStartSeqId) {
-        this.removeStartSeqId = removeStartSeqId;
-    }
-
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] {
-            { true }, { false }
-        });
-    }
-
-    @SuppressWarnings("deprecation")
-    private void startCluster(long delay) throws Exception {
-        bktb = new BookKeeperTestBase(numBookies, 0L);
-        bktb.setUp();
-
-        conf = new ServerConfiguration() {
-            @Override
-            public int getMessagesConsumedThreadRunInterval() {
-                return 2000;
-            }
-            @Override
-            public int getConsumeInterval() {
-                return 0;
-            }
-            @Override
-            public long getMaxEntriesPerLedger() {
-                return maxEntriesPerLedger;
-            }
-        };
-        org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
-                new org.apache.bookkeeper.conf.ClientConfiguration();
-        bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
-        .setThrottleValue(3);
-        conf.addConf(bkClientConf);
-
-        metadataManagerFactory = new TestMetadataManagerFactory(conf, bktb.getZooKeeperClient());
-        tpManager = metadataManagerFactory.newTopicPersistenceManager();
-
-        scheduler = Executors.newScheduledThreadPool(1);
-        tm = new TrivialOwnAllTopicManager(conf, scheduler);
-        manager = new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
-                                                   tm, conf, scheduler);
-        sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm, manager, null, scheduler);
-    }
-
-    private void stopCluster() throws Exception {
-        tm.stop();
-        manager.stop();
-        sm.stop();
-        tpManager.close();
-        metadataManagerFactory.shutdown();
-        scheduler.shutdown();
-        bktb.tearDown();
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        startCluster(0L);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        stopCluster();
-    }
-
-    class RangeScanVerifier implements ScanCallback {
-        LinkedList<Message> pubMsgs;
-        boolean runNextScan = false;
-        RangeScanRequest nextScan = null;
-
-        public RangeScanVerifier(LinkedList<Message> pubMsgs, RangeScanRequest nextScan) {
-            this.pubMsgs = pubMsgs;
-            this.nextScan = nextScan;
-        }
-
-        @Override
-        public void messageScanned(Object ctx, Message recvMessage) {
-            logger.info("Scanned message : {}", recvMessage.getMsgId().getLocalComponent());
-            if (null != nextScan && !runNextScan) {
-                runNextScan = true;
-                manager.scanMessages(nextScan);
-            }
-
-            if (pubMsgs.size() == 0) {
-                return;
-            }
-
-            Message pubMsg = pubMsgs.removeFirst();
-            if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
-                fail("Scanned message not equal to expected");
-            }
-        }
-
-        @Override
-        public void scanFailed(Object ctx, Exception exception) {
-            fail("Failed to scan messages.");
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void scanFinished(Object ctx, ReasonForFinish reason) {
-            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-            try {
-                statusQueue.put(pubMsgs.isEmpty());
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-    }
-
-    private LinkedList<Message> subMessages(List<Message> msgs, int start, int end) {
-        LinkedList<Message> result = new LinkedList<Message>();
-        for (int i=start; i<=end; i++) {
-            result.add(msgs.get(i));
-        }
-        return result;
-    }
-
-    @Test(timeout=60000)
-    public void testScanMessagesOnClosedLedgerAfterDeleteLedger() throws Exception {
-        scanMessagesAfterDeleteLedgerTest(2);
-    }
-
-    @Test(timeout=60000)
-    public void testScanMessagesOnUnclosedLedgerAfterDeleteLedger() throws Exception {
-        scanMessagesAfterDeleteLedgerTest(1);
-    }
-
-    private void scanMessagesAfterDeleteLedgerTest(int numLedgers) throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestScanMessagesAfterDeleteLedger");
-
-        List<Message> msgs = new ArrayList<Message>();
-
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 2));
-
-        for (int i=0; i<numLedgers; i++) {
-            releaseTopic(topic);
-            // acquire topic again to force a new ledger
-            acquireTopic(topic);
-            msgs.addAll(publishMessages(topic, 2));
-        }
-
-        consumedUntil(topic, 2L);
-        // Wait until ledger ranges is updated.
-        Thread.sleep(2000L);
-        releaseTopic(topic);
-
-        // acquire topic again
-        acquireTopic(topic);
-        // scan messages starting from 3
-        LinkedBlockingQueue<Boolean> statusQueue =
-            new LinkedBlockingQueue<Boolean>();
-        manager.scanMessages(new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
-                             new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
-    }
-
-    @Test(timeout=60000)
-    public void testScanMessagesOnEmptyLedgerAfterDeleteLedger() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnEmptyLedgerAfterDeleteLedger");
-
-        List<Message> msgs = new ArrayList<Message>();
-
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 2));
-        releaseTopic(topic);
-
-        // acquire topic again to force a new ledger
-        acquireTopic(topic);
-        logger.info("Consumed messages.");
-        consumedUntil(topic, 2L);
-        // Wait until ledger ranges is updated.
-        Thread.sleep(2000L);
-        logger.info("Released topic with an empty ledger.");
-        // release topic to force an empty ledger
-        releaseTopic(topic);
-
-        // publish 2 more messages, these message expected to be id 3 and 4
-        acquireTopic(topic);
-        logger.info("Published more messages.");
-        msgs.addAll(publishMessages(topic, 2));
-        releaseTopic(topic);
-
-        // acquire topic again
-        acquireTopic(topic);
-        // scan messages starting from 3
-        LinkedBlockingQueue<Boolean> statusQueue =
-            new LinkedBlockingQueue<Boolean>();
-        long startSeqId = removeStartSeqId ? 1 : 3;
-        manager.scanMessages(new RangeScanRequest(topic, startSeqId, 2, Long.MAX_VALUE,
-                             new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
-    }
-
-    @Test(timeout=60000)
-    public void testFailedToDeleteLedger1() throws Exception {
-        failedToDeleteLedgersTest(1);
-    }
-
-    @Test(timeout=60000)
-    public void testFailedToDeleteLedger2() throws Exception {
-        // succeed to delete second ledger
-        failedToDeleteLedgersTest(2);
-    }
-
-    private void failedToDeleteLedgersTest(int numLedgers) throws Exception {
-        final ByteString topic = ByteString.copyFromUtf8("TestFailedToDeleteLedger");
-        final int serviceDownCount = 1;
-
-        List<Message> msgs = new ArrayList<Message>();
-
-        for (int i=0; i<numLedgers; i++) {
-            acquireTopic(topic);
-            msgs.addAll(publishMessages(topic, 2));
-            releaseTopic(topic);
-        }
-
-        // acquire topic again to force a new ledger
-        acquireTopic(topic);
-        logger.info("Consumed messages.");
-        metadataManagerFactory.setServiceDownCount(serviceDownCount);
-        // failed consumed
-        consumedUntil(topic, 2L * numLedgers);
-        // Wait until ledger ranges is updated.
-        Thread.sleep(2000L);
-        logger.info("Released topic with an empty ledger.");
-        // release topic to force an empty ledger
-        releaseTopic(topic);
-
-        // publish 2 more messages, these message expected to be id 3 and 4
-        acquireTopic(topic);
-        logger.info("Published more messages.");
-        msgs.addAll(publishMessages(topic, 2));
-        releaseTopic(topic);
-
-        // acquire topic again
-        acquireTopic(topic);
-        LinkedBlockingQueue<Boolean> statusQueue =
-            new LinkedBlockingQueue<Boolean>();
-        manager.scanMessages(new RangeScanRequest(topic, numLedgers * 2 + 1, 2, Long.MAX_VALUE,
-                             new RangeScanVerifier(subMessages(msgs, numLedgers * 2, numLedgers * 2 + 1), null), statusQueue));
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
-
-        // consumed
-        consumedUntil(topic, (numLedgers + 1) * 2L);
-        // Wait until ledger ranges is updated.
-        Thread.sleep(2000L);
-
-        Semaphore latch = new Semaphore(1);
-        latch.acquire();
-        tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
-                if (null == ranges || ranges.getValue().getRangesList().size() > 1) {
-                    failureException = new PubSubException.NoTopicPersistenceInfoException("Invalid persistence info found for topic " + topic.toStringUtf8());
-                    ((Semaphore)ctx).release();
-                    return;
-                }
-                failureException = null;
-                ((Semaphore)ctx).release();
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                failureException = exception;
-                ((Semaphore)ctx).release();
-            }
-        }, latch);
-        latch.acquire();
-        latch.release();
-        assertNull("Should not fail with exception.", failureException);
-    }
-
-    @Test(timeout=60000)
-    public void testScanMessagesOnTwoLedgers() throws Exception {
-        stopCluster();
-        startCluster(readDelay);
-
-        ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers");
-
-        List<Message> msgs = new ArrayList<Message>();
-
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 1));
-        releaseTopic(topic);
-
-        // acquire topic again to force a new ledger
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 3));
-
-        // scan messages
-        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-        RangeScanRequest nextScan = new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
-                new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue);
-        manager.scanMessages(new RangeScanRequest(topic, 1, 2, Long.MAX_VALUE,
-                new RangeScanVerifier(subMessages(msgs, 0, 1), nextScan), statusQueue));
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        if (b == null) {
-            fail("One scan request doesn't finish");
-        }
-        b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        if (b == null) {
-            fail("One scan request doesn't finish");
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testInconsistentSubscriptionStateAndLedgerRanges1() throws Exception {
-        // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
-        // For this case, Step (2) failed to update subscription state metadata,
-        // but LedgerRanges is updated success.
-        // Result: scan messages from 1 to 4 take place on ledger L2.
-        inconsistentSubscriptionStateAndLedgerRanges(1);
-    }
-
-    @Test(timeout=60000)
-    public void testInconsistentSubscriptionStateAndLedgerRanges2() throws Exception {
-        // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
-        // For this case, step (2) failed to update subscription state metadata,
-        // step (3) successfully delete L1 but failed to update LedgerRanges.
-        // Result: scan messages from 1 to 4 falls in L1 and L2,
-        //         but BookKeeper may complain L1 not found.
-        inconsistentSubscriptionStateAndLedgerRanges(2);
-    }
-
-    /**
-     * Since InMemorySubscriptionState and LedgerRanges is maintained
-     * separately, there may exist such inconsistent state:
-     * (1). Topic ledgers: L1 [1 ~ 2], L2 [3 ~ ]
-     * (2). Subscriber consumes to 2 and InMemorySubscriptionState is updated
-     *      successfully but failed when updating subscription state metadata
-     * (3). AbstractSubscriptionManager#MessagesConsumedTask use
-     *      InMemorySubscriptionState to do garbage collection
-     *      and L1 is delete
-     * (4). If Hub restarts at this time, old subscription state is read and
-     *      Hub will try to deliver message from 1
-     */
-    public void inconsistentSubscriptionStateAndLedgerRanges(int failedCount) throws Exception {
-        final ByteString topic = ByteString.copyFromUtf8("inconsistentSubscriptionStateAndLedgerRanges");
-        final ByteString subscriberId = ByteString.copyFromUtf8("subId");
-        LinkedList<Message> msgs = new LinkedList<Message>();
-
-        // make ledger L1 [1 ~ 2]
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 2));
-        releaseTopic(topic);
-
-        // acquire topic again to force a new ledger L2 [3 ~ ]
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, 2));
-
-        StubCallback<Void> voidCb = new StubCallback<Void>();
-        StubCallback<SubscriptionData> subDataCb = new StubCallback<SubscriptionData>();
-        Either<Void, PubSubException> voidResult;
-        Either<SubscriptionData, PubSubException> subDataResult;
-
-        // prepare for subscription
-        sm.acquiredTopic(topic, voidCb, null);
-        voidResult = ConcurrencyUtils.take(voidCb.queue);
-        assertNull(voidResult.right()); // no exception
-
-        // Do subscription
-        SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(subscriberId)
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        sm.serveSubscribeRequest(topic, subRequest, MessageSeqId.newBuilder().setLocalComponent(0).build(), subDataCb,
-                null);
-        subDataResult = ConcurrencyUtils.take(subDataCb.queue);
-        assertNotNull(subDataResult.left()); // serveSubscribeRequest success
-                                             // and return a SubscriptionData
-                                             // object
-        assertNull(subDataResult.right()); // no exception
-
-        // simulate inconsistent situation between InMemorySubscriptionState and
-        // LedgerRanges
-        metadataManagerFactory.setServiceDownCount(failedCount);
-        sm.setConsumeSeqIdForSubscriber(topic, subscriberId, MessageSeqId.newBuilder().setLocalComponent(2).build(),
-                voidCb, null);
-        voidResult = ConcurrencyUtils.take(voidCb.queue);
-        assertNotNull(voidResult.right()); // update subscription state failed
-                                           // and expect a exception
-
-        // wait AbstractSubscriptionManager#MessagesConsumedTask to garbage
-        // collect ledger L1
-        Thread.sleep(conf.getMessagesConsumedThreadRunInterval() * 2);
-
-        // simulate hub restart: read old subscription state metadata and deliver
-        // messages from 1
-        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-        RangeScanRequest scan = new RangeScanRequest(topic, 1, 4, Long.MAX_VALUE, new RangeScanVerifier(msgs, null),
-                statusQueue);
-        manager.scanMessages(scan);
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        if (b == null) {
-            fail("Scan request doesn't finish");
-        }
-    }
-
-    @Test(timeout=60000)
-    // Add this test case for BOOKKEEPER-458
-    public void testReadWhenTopicChangeLedger() throws Exception {
-        final ByteString topic = ByteString.copyFromUtf8("testReadWhenTopicChangeLedger");
-        LinkedList<Message> msgs = new LinkedList<Message>();
-
-        // Write maxEntriesPerLedger entries to make topic change ledger
-        acquireTopic(topic);
-        msgs.addAll(publishMessages(topic, maxEntriesPerLedger));
-
-        // Notice, change ledger operation is asynchronous, so we should wait!!!
-        Thread.sleep(2000);
-
-        // Issue a scan request right start from the new ledger
-        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-        RangeScanRequest scan = new RangeScanRequest(topic, maxEntriesPerLedger + 1, 1, Long.MAX_VALUE,
-                new RangeScanVerifier(msgs, null), statusQueue);
-        manager.scanMessages(scan);
-        Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
-        if (b == null) {
-            fail("Scan request timeout");
-        }
-        assertFalse("Expect none message is scanned on the new created ledger", b);
-    }
-
-    class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void operationFailed(Object ctx, PubSubException exception) {
-            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-            try {
-                statusQueue.put(false);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
-            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-            try {
-                statusQueue.put(true);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    protected List<Message> publishMessages(ByteString topic, int numMsgs) throws Exception {
-        List<Message> msgs = HelperMethods.getRandomPublishedMessages(numMsgs, 1024);
-        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-        for (Message msg : msgs) {
-
-            try {
-                manager.persistMessage(new PersistRequest(topic, msg, new TestCallback(), statusQueue));
-                // wait a maximum of a minute
-                Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
-                if (b == null) {
-                    throw new RuntimeException("Publish timed out");
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return msgs;
-    }
-
-    protected void acquireTopic(ByteString topic) throws Exception {
-        Semaphore latch = new Semaphore(1);
-        latch.acquire();
-        manager.acquiredTopic(topic, new Callback<Void>() {
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                failureException = null;
-                ((Semaphore)ctx).release();
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
-                failureException = exception;
-                ((Semaphore)ctx).release();
-            }
-        }, latch);
-        latch.acquire();
-        latch.release();
-        if (null != failureException) {
-            throw failureException;
-        }
-    }
-
-    protected void releaseTopic(final ByteString topic) throws Exception {
-        manager.lostTopic(topic);
-        // backward testing ledger ranges without start seq id
-        if (removeStartSeqId) {
-            Semaphore latch = new Semaphore(1);
-            latch.acquire();
-            tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
-                @Override
-                public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
-                    if (null == ranges) {
-                        failureException = new PubSubException.NoTopicPersistenceInfoException("No persistence info found for topic " + topic.toStringUtf8());
-                        ((Semaphore)ctx).release();
-                        return;
-                    }
-
-                    // build a new ledger ranges w/o start seq id.
-                    LedgerRanges.Builder builder = LedgerRanges.newBuilder();
-                    final List<LedgerRange> rangesList = ranges.getValue().getRangesList();
-                    for (LedgerRange range : rangesList) {
-                        LedgerRange.Builder newRangeBuilder = LedgerRange.newBuilder();
-                        newRangeBuilder.setLedgerId(range.getLedgerId());
-                        if (range.hasEndSeqIdIncluded()) {
-                            newRangeBuilder.setEndSeqIdIncluded(range.getEndSeqIdIncluded());
-                        }
-                        builder.addRanges(newRangeBuilder.build());
-                    }
-                    tpManager.writeTopicPersistenceInfo(topic, builder.build(), ranges.getVersion(),
-                    new Callback<Version>() {
-                        @Override
-                        public void operationFinished(Object ctx, Version newVersion) {
-                            failureException = null;
-                            ((Semaphore)ctx).release();
-                        }
-                        @Override
-                        public void operationFailed(Object ctx, PubSubException exception) {
-                            failureException = exception;
-                            ((Semaphore)ctx).release();
-                        }
-                    }, ctx);
-                }
-                @Override
-                public void operationFailed(Object ctx, PubSubException exception) {
-                    failureException = exception;
-                    ((Semaphore)ctx).release();
-                }
-            }, latch);
-            latch.acquire();
-            latch.release();
-            if (null != failureException) {
-                throw failureException;
-            }
-        }
-    }
-
-    protected void consumedUntil(ByteString topic, long seqId) throws Exception {
-        manager.consumedUntil(topic, seqId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
deleted file mode 100644
index 1f30b5b..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-
-public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
-    BookKeeperTestBase bktb;
-    private final int numBookies = 3;
-
-    MetadataManagerFactory metadataManagerFactory = null;
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        // We need to setUp this class first since the super.setUp() method will
-        // need the BookKeeperTestBase to be instantiated.
-        bktb = new BookKeeperTestBase(numBookies);
-        bktb.setUp();
-        super.setUp();
-    }
-
-
-    @After
-    @Override
-    public void tearDown() throws Exception {
-        bktb.tearDown();
-        super.tearDown();
-        if (null != metadataManagerFactory) {
-            metadataManagerFactory.shutdown();
-        }
-    }
-
-    @Override
-    long getLowestSeqId() {
-        return 1;
-    }
-
-    @Override
-    PersistenceManager instantiatePersistenceManager() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
-        metadataManagerFactory =
-            MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
-
-        return new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
-                                                new TrivialOwnAllTopicManager(conf, scheduler),
-                                                conf, scheduler);
-    }
-
-    @Override
-    public long getExpectedSeqId(int numPublished) {
-        return numPublished;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
deleted file mode 100644
index da2b06c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.hedwig.util.Either;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class TestBookkeeperPersistenceManagerWhiteBox {
-
-    protected static final Logger logger =
-        LoggerFactory.getLogger(TestBookkeeperPersistenceManagerWhiteBox.class);
-
-    BookKeeperTestBase bktb;
-    private final int numBookies = 3;
-    BookkeeperPersistenceManager bkpm;
-    MetadataManagerFactory mm;
-    ServerConfiguration conf;
-    ScheduledExecutorService scheduler;
-    TopicManager tm;
-    ByteString topic = ByteString.copyFromUtf8("topic0");
-
-    @Before
-    public void setUp() throws Exception {
-        bktb = new BookKeeperTestBase(numBookies);
-        bktb.setUp();
-
-        conf = new ServerConfiguration();
-        scheduler = Executors.newScheduledThreadPool(1);
-        tm = new TrivialOwnAllTopicManager(conf, scheduler);
-
-        mm = MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
-
-        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, conf, scheduler);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        mm.shutdown();
-        bktb.tearDown();
-    }
-
-    @Test(timeout=60000)
-    public void testEmptyDirtyLedger() throws Exception {
-
-        StubCallback<Void> stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        // now abandon, and try another time, the prev ledger should be dirty
-
-        bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), mm, tm,
-                                                conf, scheduler);
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-    }
-
-    @Test(timeout=60000)
-    public void testNonEmptyDirtyLedger() throws Exception {
-
-        Random r = new Random();
-        int NUM_MESSAGES_TO_TEST = 100;
-        int SIZE_OF_MESSAGES_TO_TEST = 100;
-        int index = 0;
-        int numPrevLedgers = 0;
-        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
-                                 SIZE_OF_MESSAGES_TO_TEST);
-
-        while (index < messages.size()) {
-
-            StubCallback<Void> stubCallback = new StubCallback<Void>();
-            bkpm.acquiredTopic(topic, stubCallback, null);
-            assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-            assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-            StubCallback<PubSubProtocol.MessageSeqId> persistCallback = new StubCallback<PubSubProtocol.MessageSeqId>();
-            bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
-            assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-
-            index++;
-            // once in every 10 times, give up ledger
-            if (r.nextInt(10) == 9) {
-                // should not release topic when the message is last message
-                // otherwise when we call scan, bookkeeper persistence manager doesn't own the topic
-                if (index < messages.size()) {
-                    // Make the bkpm lose its memory
-                    bkpm.topicInfos.clear();
-                    numPrevLedgers++;
-                }
-            }
-        }
-
-        // Lets scan now
-        StubScanCallback scanCallback = new StubScanCallback();
-        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
-        for (int i = 0; i < messages.size(); i++) {
-            Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
-            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
-            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
-        }
-        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
-    }
-
-    static final long maxEntriesPerLedger = 10;
-
-    class ChangeLedgerServerConfiguration extends ServerConfiguration {
-        @Override
-        public long getMaxEntriesPerLedger() {
-            return maxEntriesPerLedger;
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testSyncChangeLedgers() throws Exception {
-        int NUM_MESSAGES_TO_TEST = 101;
-        int SIZE_OF_MESSAGES_TO_TEST = 100;
-        int index = 0;
-        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
-                                 SIZE_OF_MESSAGES_TO_TEST);
-
-        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
-                                                new ChangeLedgerServerConfiguration(), scheduler);
-
-        // acquire the topic
-        StubCallback<Void> stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-        while (index < messages.size()) {
-            logger.debug("Persist message {}", (index + 1));
-            StubCallback<MessageSeqId> persistCallback = new StubCallback<MessageSeqId>();
-            bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
-            assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-
-            index++;
-            if (index % maxEntriesPerLedger == 1) {
-                assertEquals(index / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
-            }
-        }
-        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-        // Lets scan now
-        StubScanCallback scanCallback = new StubScanCallback();
-        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
-        for (int i = 0; i < messages.size(); i++) {
-            Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
-            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
-            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
-        }
-        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
-        // Make the bkpm lose its memory
-        bkpm.topicInfos.clear();
-
-        // acquire the topic again
-        stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1, bkpm.topicInfos.get(topic).ledgerRanges.size());
-    }
-
-    class OrderCheckingCallback extends StubCallback<MessageSeqId> {
-        long curMsgId;
-        int numMessages;
-        int numProcessed;
-        int numSuccess;
-        int numFailed;
-
-        OrderCheckingCallback(long startMsgId, int numMessages) {
-            this.curMsgId = startMsgId;
-            this.numMessages = numMessages;
-            numProcessed = numSuccess = numFailed = 0;
-        }
-
-        @Override
-        public void operationFailed(Object ctx, final PubSubException exception) {
-            synchronized (this) {
-                ++numFailed;
-                ++numProcessed;
-                if (numProcessed == numMessages) {
-                    MessageSeqId.Builder seqIdBuilder =
-                        MessageSeqId.newBuilder().setLocalComponent(curMsgId);
-                    super.operationFinished(ctx, seqIdBuilder.build());
-                }
-            }
-        }
-
-        @Override
-        public void operationFinished(Object ctx, final MessageSeqId seqId) {
-            synchronized(this) {
-                long msgId = seqId.getLocalComponent();
-                if (msgId == curMsgId) {
-                    ++curMsgId;
-                }
-                ++numSuccess;
-                ++numProcessed;
-                if (numProcessed == numMessages) {
-                    MessageSeqId.Builder seqIdBuilder =
-                        MessageSeqId.newBuilder().setLocalComponent(curMsgId);
-                    super.operationFinished(ctx, seqIdBuilder.build());
-                }
-            }
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testAsyncChangeLedgers() throws Exception {
-        int NUM_MESSAGES_TO_TEST = 101;
-        int SIZE_OF_MESSAGES_TO_TEST = 100;
-        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
-                                 SIZE_OF_MESSAGES_TO_TEST);
-
-        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
-                                                new ChangeLedgerServerConfiguration(), scheduler);
-
-        // acquire the topic
-        StubCallback<Void> stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-        OrderCheckingCallback persistCallback =
-            new OrderCheckingCallback(1, NUM_MESSAGES_TO_TEST);
-        for (Message message : messages) {
-            bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
-        }
-        assertEquals(NUM_MESSAGES_TO_TEST + 1,
-                     ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-        assertEquals(NUM_MESSAGES_TO_TEST, persistCallback.numSuccess);
-        assertEquals(0, persistCallback.numFailed);
-        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger,
-                     bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-        // ensure the bkpm has the topic before scanning
-        stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-
-        // Lets scan now
-        StubScanCallback scanCallback = new StubScanCallback();
-        bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
-        for (int i = 0; i < messages.size(); i++) {
-            Either<Message,Exception> e = ConcurrencyUtils.take(scanCallback.queue);
-            Message scannedMessage = e.left();
-            if (scannedMessage == null) {
-                throw e.right();
-            }
-
-            assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
-            assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
-        }
-        assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
-        // Make the bkpm lose its memory
-        bkpm.topicInfos.clear();
-
-        // acquire the topic again
-        stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1,
-                     bkpm.topicInfos.get(topic).ledgerRanges.size());
-    }
-
-    class ChangeLedgerCallback extends OrderCheckingCallback {
-        boolean tearDown = false;
-        ChangeLedgerCallback(long startMsgId, int numMessages) {
-            super(startMsgId, numMessages);
-        }
-
-        @Override
-        public void operationFinished(Object ctx, final MessageSeqId msgId) {
-            super.operationFinished(ctx, msgId);
-            // shutdown bookie server when changing ledger
-            // so following requests should fail
-            if (msgId.getLocalComponent() >= maxEntriesPerLedger && !tearDown) {
-                try {
-                    bktb.tearDownOneBookieServer();
-                    bktb.tearDownOneBookieServer();
-                } catch (Exception e) {
-                    logger.error("Failed to tear down bookie server.");
-                }
-                tearDown = true;
-            }
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testChangeLedgerFailure() throws Exception {
-        int NUM_MESSAGES_TO_TEST = 101;
-        int SIZE_OF_MESSAGES_TO_TEST = 100;
-        List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
-                                 SIZE_OF_MESSAGES_TO_TEST);
-
-        bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
-                                                new ChangeLedgerServerConfiguration(), scheduler);
-
-        // acquire the topic
-        StubCallback<Void> stubCallback = new StubCallback<Void>();
-        bkpm.acquiredTopic(topic, stubCallback, null);
-        assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
-        assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
-        ChangeLedgerCallback persistCallback =
-            new ChangeLedgerCallback(1, NUM_MESSAGES_TO_TEST);
-        for (Message message : messages) {
-            bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
-        }
-        assertEquals(maxEntriesPerLedger + 1,
-                     ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-        assertEquals(maxEntriesPerLedger, persistCallback.numSuccess);
-        assertEquals(NUM_MESSAGES_TO_TEST - maxEntriesPerLedger, persistCallback.numFailed);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
deleted file mode 100644
index 90c1817..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public class TestDeadlock extends HedwigHubTestBase {
-
-    protected static final Logger logger = LoggerFactory.getLogger(TestDeadlock.class);
-
-    // Client side variables
-    protected HedwigClient client;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-
-    ByteString topic = ByteString.copyFromUtf8("DeadLockTopic");
-    ByteString subscriberId = ByteString.copyFromUtf8("dl");
-
-    public TestDeadlock() {
-        super(1);
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        numBookies = 1;
-        readDelay = 1000L; // 1s
-        super.setUp();
-        client = new HedwigClient(new HubClientConfiguration());
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        client.close();
-        super.tearDown();
-    }
-
-    // Test implementation of Callback for async client actions.
-    static class TestCallback implements Callback<Void> {
-        private final SynchronousQueue<Boolean> queue;
-
-        public TestCallback(SynchronousQueue<Boolean> queue) {
-            this.queue = queue;
-        }
-
-        @Override
-        public void operationFinished(Object ctx, Void resultOfOperation) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Operation finished!");
-                    ConcurrencyUtils.put(queue, true);
-                }
-            }).start();
-        }
-
-        @Override
-        public void operationFailed(Object ctx, final PubSubException exception) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    logger.error("Operation failed!", exception);
-                    ConcurrencyUtils.put(queue, false);
-                }
-            }).start();
-        }
-    }
-
-    // Test implementation of subscriber's message handler.
-    class TestMessageHandler implements MessageHandler {
-        private final SynchronousQueue<Boolean> consumeQueue;
-        boolean doAdd = false;
-
-        public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
-            this.consumeQueue = consumeQueue;
-        }
-
-        public void deliver(ByteString t, ByteString sub, final Message msg, Callback<Void> callback,
-                            Object context) {
-            if (!doAdd) {
-                // after receiving first message, we send a publish
-                // to obtain permit of second ledger
-                doAdd = true;
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        // publish messages again to obtain permits
-                        logger.info("Start publishing message to obtain permit");
-                        // it obtains the permit and wait for a response,
-                        // but the response is delayed and readEntries is called
-                        // in the readComplete callback to read entries of the
-                        // same ledger. since there is no permit, it blocks
-                        try {
-                            CountDownLatch latch = new CountDownLatch(1);
-                            sleepBookies(8, latch);
-                            latch.await();
-                            SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-                            for (int i=0; i<3; i++) {
-                                publisher.asyncPublish(topic, getMsg(9999), new TestCallback(queue), null);
-                            }
-                            for (int i=0; i<3; i++) {
-                                if (!queue.take()) {
-                                    logger.error("Error publishing to topic {}", topic);
-                                    ConcurrencyUtils.put(consumeQueue, false);
-                                }
-                            }
-                        } catch (Exception e) {
-                            logger.error("Failed to publish message to obtain permit.");
-                        }
-                    }
-                }).start();
-            }
-
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(consumeQueue, true);
-                }
-            }).start();
-            callback.operationFinished(context, null);
-        }
-    }
-
-    // Helper function to generate Messages
-    protected Message getMsg(int msgNum) {
-        return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
-    }
-
-    // Helper function to generate Topics
-    protected ByteString getTopic(int topicNum) {
-        return ByteString.copyFromUtf8("DeadLockTopic" + topicNum);
-    }
-
-    class TestServerConfiguration extends HubServerConfiguration {
-        public TestServerConfiguration(int serverPort, int sslServerPort) {
-            super(serverPort, sslServerPort);
-        }
-        @Override
-        public int getBkEnsembleSize() {
-            return 1;
-        }
-        @Override
-        public int getBkWriteQuorumSize() {
-            return 1;
-        }
-        @Override
-        public int getBkAckQuorumSize() {
-            return 1;
-        }
-        @Override
-        public int getReadAheadCount() {
-            return 4;
-        }
-        @Override
-        public long getMaximumCacheSize() {
-            return 32;
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
-        ServerConfiguration serverConf = new TestServerConfiguration(serverPort, sslServerPort);
-
-        org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
-            new org.apache.bookkeeper.conf.ClientConfiguration();
-        bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
-                    .setThrottleValue(3);
-        try {
-            serverConf.addConf(bkClientConf);
-        } catch (Exception e) {
-        }
-        return serverConf;
-    }
-
-    @Test(timeout=60000)
-    public void testDeadlock() throws Exception {
-        int numMessages = 5;
-
-        SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
-
-        // subscribe to topic
-        logger.info("Setup subscriptions");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subscriberId, opts);
-        subscriber.closeSubscription(topic, subscriberId);
-
-        // publish 5 messages to form first ledger
-        for (int i=0; i<numMessages; i++) {
-            logger.info("Start publishing message {}", i);
-            publisher.publish(topic, getMsg(i));
-        }
-
-        stopHubServers();
-        Thread.sleep(1000);
-        startHubServers();
-
-        logger.info("Start publishing messages");
-        // publish enough messages to second ledger
-        // so a scan request need to scan over two ledgers, which
-        // cause readEntries executed in the previous readEntries
-        for (int i=0; i<numMessages; i++) {
-            logger.info("Start publishing message {}", i+5);
-            publisher.publish(topic, getMsg(i));
-        }
-
-        logger.info("Start subscribe topics again and receive messages");
-        // subscribe to topic
-        subscriber.subscribe(topic, subscriberId, opts);
-        subscriber.startDelivery(topic, subscriberId,
-                                 new TestMessageHandler(consumeQueue));
-        for (int i=0; i<(2*numMessages+3); i++) {
-            assertTrue(consumeQueue.take());
-        }
-    }
-
-    protected void sleepBookies(final int seconds, final CountDownLatch l)
-            throws InterruptedException, IOException {
-        Thread sleeper = new Thread() {
-                public void run() {
-                    try {
-                        bktb.suspendAllBookieServers();
-                        l.countDown();
-                        Thread.sleep(seconds * 1000);
-                        bktb.resumeAllBookieServers();
-                    } catch (Exception e) {
-                        logger.error("Error suspending thread", e);
-                    }
-                }
-            };
-        sleeper.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
deleted file mode 100644
index 856eab4..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import org.junit.After;
-import org.junit.Before;
-
-public class TestLocalDBPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
-
-    @After
-    @Override
-    public void tearDown() throws Exception {
-        super.tearDown();
-        ((LocalDBPersistenceManager) persistenceManager).reset();
-    }
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-    }
-
-    @Override
-    long getLowestSeqId() {
-        return 1;
-    }
-
-    @Override
-    PersistenceManager instantiatePersistenceManager() {
-        return LocalDBPersistenceManager.instance();
-    }
-
-    @Override
-    public long getExpectedSeqId(int numPublished) {
-        return numPublished;
-    }
-
-}