You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:12 UTC

[02/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
deleted file mode 100644
index 52a5874..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-
-import static org.junit.Assert.*;
-
-public abstract class TestPersistenceManagerBlackBox {
-    protected PersistenceManager persistenceManager;
-    protected int NUM_MESSAGES_TO_TEST = 5;
-    protected int NUM_TOPICS_TO_TEST = 5;
-    private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
-    TestCallback testCallback = new TestCallback();
-
-    RuntimeException failureException;
-
-    class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
-
-        public void operationFailed(Object ctx, PubSubException exception) {
-            throw (failureException = new RuntimeException(exception));
-        }
-
-        @SuppressWarnings("unchecked")
-        public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
-            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-            try {
-                statusQueue.put(true);
-            } catch (InterruptedException e) {
-                throw (failureException = new RuntimeException(e));
-            }
-        }
-    }
-
-    class RangeScanVerifierListener implements ScanCallback {
-        List<Message> pubMsgs;
-
-        public RangeScanVerifierListener(List<Message> pubMsgs) {
-            this.pubMsgs = pubMsgs;
-        }
-
-        public void messageScanned(Object ctx, Message recvMessage) {
-            if (pubMsgs.isEmpty()) {
-                throw (failureException = new RuntimeException("Message received when none expected"));
-            }
-
-            Message pubMsg = pubMsgs.get(0);
-            if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
-                throw (failureException = new RuntimeException("Scanned message not equal to expected"));
-            }
-            pubMsgs.remove(0);
-        }
-
-        public void scanFailed(Object ctx, Exception exception) {
-            throw (failureException = new RuntimeException(exception));
-        }
-
-        @SuppressWarnings("unchecked")
-        public void scanFinished(Object ctx, ReasonForFinish reason) {
-            if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
-                throw (failureException = new RuntimeException("Scan finished prematurely " + reason));
-            }
-            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-            try {
-                statusQueue.put(true);
-            } catch (InterruptedException e) {
-                throw (failureException = new RuntimeException(e));
-            }
-        }
-
-    }
-
-    class PointScanVerifierListener implements ScanCallback {
-        List<Message> pubMsgs;
-        ByteString topic;
-
-        public PointScanVerifierListener(List<Message> pubMsgs, ByteString topic) {
-            this.topic = topic;
-            this.pubMsgs = pubMsgs;
-        }
-
-        @SuppressWarnings("unchecked")
-        public void messageScanned(Object ctx, Message recvMessage) {
-
-            Message pubMsg = pubMsgs.get(0);
-            if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
-                throw (failureException = new RuntimeException("Scanned message not equal to expected"));
-            }
-            pubMsgs.remove(0);
-
-            if (pubMsgs.isEmpty()) {
-                LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
-                try {
-                    statusQueue.put(true);
-                } catch (InterruptedException e) {
-                    throw (failureException = new RuntimeException(e));
-                }
-            } else {
-                long seqId = recvMessage.getMsgId().getLocalComponent();
-                seqId = persistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
-                ScanRequest request = new ScanRequest(topic, seqId, new PointScanVerifierListener(pubMsgs, topic), ctx);
-                persistenceManager.scanSingleMessage(request);
-            }
-
-        }
-
-        public void scanFailed(Object ctx, Exception exception) {
-            throw (failureException = new RuntimeException(exception));
-        }
-
-        public void scanFinished(Object ctx, ReasonForFinish reason) {
-
-        }
-
-    }
-
-    class ScanVerifier implements Runnable {
-        List<Message> pubMsgs;
-        ByteString topic;
-        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-
-        public ScanVerifier(ByteString topic, List<Message> pubMsgs) {
-            this.topic = topic;
-            this.pubMsgs = pubMsgs;
-        }
-
-        public void run() {
-            // start the scan
-            try {
-                if (persistenceManager instanceof PersistenceManagerWithRangeScan) {
-
-                    ScanCallback listener = new RangeScanVerifierListener(pubMsgs);
-
-                    PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
-
-                    rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
-                                                         NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
-
-                } else {
-
-                    ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
-                    persistenceManager
-                    .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
-
-                }
-                // now listen for it to finish
-                // wait a maximum of a minute
-                Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
-                if (b == null) {
-                    throw (failureException = new RuntimeException("Scanning timed out"));
-                }
-            } catch (InterruptedException e) {
-                throw (failureException = new RuntimeException(e));
-            }
-        }
-    }
-
-    class Publisher implements Runnable {
-        List<Message> pubMsgs;
-        ByteString topic;
-
-        public Publisher(ByteString topic, List<Message> pubMsgs) {
-            this.pubMsgs = pubMsgs;
-            this.topic = topic;
-        }
-
-        public void run() {
-            LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-
-            for (Message msg : pubMsgs) {
-
-                try {
-                    persistenceManager.persistMessage(new PersistRequest(topic, msg, testCallback, statusQueue));
-                    // wait a maximum of a minute
-                    Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
-                    if (b == null) {
-                        throw (failureException = new RuntimeException("Scanning timed out"));
-                    }
-                } catch (InterruptedException e) {
-                    throw (failureException = new RuntimeException(e));
-                }
-            }
-        }
-
-    }
-
-    public void setUp() throws Exception {
-        logger.info("STARTING " + getClass());
-        persistenceManager = instantiatePersistenceManager();
-        failureException = null;
-        logger.info("Persistence Manager test setup finished");
-    }
-
-    abstract long getLowestSeqId();
-
-    abstract PersistenceManager instantiatePersistenceManager() throws Exception;
-
-    public void tearDown() throws Exception {
-        logger.info("tearDown starting");
-        persistenceManager.stop();
-        logger.info("FINISHED " + getClass());
-    }
-
-    protected ByteString getTopicName(int number) {
-        return ByteString.copyFromUtf8("topic" + number);
-    }
-
-    @Test(timeout=60000)
-    public void testPersistenceManager() throws Exception {
-        List<Thread> publisherThreads = new LinkedList<Thread>();
-        List<Thread> scannerThreads = new LinkedList<Thread>();
-        Thread thread;
-        Semaphore latch = new Semaphore(1);
-
-        for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
-            ByteString topic = getTopicName(i);
-
-            if (persistenceManager instanceof TopicOwnershipChangeListener) {
-
-                TopicOwnershipChangeListener tocl = (TopicOwnershipChangeListener) persistenceManager;
-
-                latch.acquire();
-
-                tocl.acquiredTopic(topic, new Callback<Void>() {
-                    @Override
-                    public void operationFailed(Object ctx, PubSubException exception) {
-                        failureException = new RuntimeException(exception);
-                        ((Semaphore) ctx).release();
-                    }
-
-                    @Override
-                    public void operationFinished(Object ctx, Void res) {
-                        ((Semaphore) ctx).release();
-                    }
-                }, latch);
-
-                latch.acquire();
-                latch.release();
-                if (failureException != null) {
-                    throw (Exception) failureException.getCause();
-                }
-            }
-            List<Message> msgs = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, 1024);
-
-            thread = new Thread(new Publisher(topic, msgs));
-            publisherThreads.add(thread);
-            thread.start();
-
-            thread = new Thread(new ScanVerifier(topic, msgs));
-            scannerThreads.add(thread);
-        }
-        for (Thread t : publisherThreads) {
-            t.join();
-        }
-
-        for (Thread t : scannerThreads) {
-            t.start();
-        }
-
-        for (Thread t : scannerThreads) {
-            t.join();
-        }
-
-        assertEquals(null, failureException);
-        for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
-            assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
-                         getExpectedSeqId(NUM_MESSAGES_TO_TEST));
-        }
-
-    }
-
-    abstract long getExpectedSeqId(int numPublished);
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
deleted file mode 100644
index 2e59a8a..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.junit.After;
-import org.junit.Before;
-
-public class TestReadAheadCacheBlackBox extends TestPersistenceManagerBlackBox {
-
-    @After
-    @Override
-    public void tearDown() throws Exception {
-        super.tearDown();
-        LocalDBPersistenceManager.instance().reset();
-    }
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-    }
-
-    @Override
-    long getExpectedSeqId(int numPublished) {
-        return numPublished;
-    }
-
-    @Override
-    long getLowestSeqId() {
-        return 1;
-    }
-
-    @Override
-    PersistenceManager instantiatePersistenceManager() {
-        return new ReadAheadCache(LocalDBPersistenceManager.instance(), new ServerConfiguration()).start();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
deleted file mode 100644
index ae08005..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.StubScanCallback;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public class TestReadAheadCacheWhiteBox {
-    ByteString topic = ByteString.copyFromUtf8("testTopic");
-    final static int NUM_MESSAGES = 10;
-    final static int MSG_SIZE = 50;
-    List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES, MSG_SIZE);
-    StubPersistenceManager stubPersistenceManager;
-    ReadAheadCache cacheBasedPersistenceManager;
-    MyServerConfiguration myConf = new MyServerConfiguration();
-
-    class MyReadAheadCache extends ReadAheadCache {
-        public MyReadAheadCache(PersistenceManagerWithRangeScan persistenceManger, ServerConfiguration cfg) {
-            super(persistenceManger, cfg);
-        }
-
-        @Override
-        protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
-            // make it perform in the same thread
-            obj.performRequest();
-        }
-
-    }
-
-    class MyServerConfiguration extends ServerConfiguration {
-
-        // Note these are set up, so that the size limit will be reached before
-        // the count limit
-        int readAheadCount = NUM_MESSAGES / 2;
-        long readAheadSize = (long) (MSG_SIZE * 2.5);
-        long maxCacheSize = Integer.MAX_VALUE;
-        long cacheEntryTTL = 0L;
-
-        @Override
-        public int getReadAheadCount() {
-            return readAheadCount;
-        }
-
-        @Override
-        public long getReadAheadSizeBytes() {
-            return readAheadSize;
-        }
-
-        @Override
-        public long getMaximumCacheSize() {
-            return maxCacheSize;
-        }
-
-        @Override
-        public long getCacheEntryTTL() {
-            return cacheEntryTTL;
-        }
-
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        stubPersistenceManager = new StubPersistenceManager();
-        cacheBasedPersistenceManager = new MyReadAheadCache(stubPersistenceManager, myConf).start();
-    }
-
-    @Test(timeout=60000)
-    public void testPersistMessage() throws Exception {
-        StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
-        PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
-
-        stubPersistenceManager.failure = true;
-        cacheBasedPersistenceManager.persistMessage(request);
-        assertNotNull(ConcurrencyUtils.take(callback.queue).right());
-
-        CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
-                                    .getLocalComponent());
-        assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
-
-        stubPersistenceManager.failure = false;
-        persistMessage(messages.get(0));
-    }
-
-    private void persistMessage(Message msg) throws Exception {
-        StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
-        PersistRequest request = new PersistRequest(topic, msg, callback, null);
-        cacheBasedPersistenceManager.persistMessage(request);
-        assertNotNull(ConcurrencyUtils.take(callback.queue).left());
-        CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
-                                    .getLocalComponent());
-        CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
-        assertNotNull(cacheValue);
-        assertFalse(cacheValue.isStub());
-        assertTrue(HelperMethods.areEqual(cacheValue.getMessage(), msg));
-
-    }
-
-    @Test(timeout=60000)
-    public void testScanSingleMessage() throws Exception {
-        StubScanCallback callback = new StubScanCallback();
-        ScanRequest request = new ScanRequest(topic, 1, callback, null);
-        stubPersistenceManager.failure = true;
-
-        cacheBasedPersistenceManager.scanSingleMessage(request);
-        assertTrue(callback.isFailed());
-        assertTrue(0 == cacheBasedPersistenceManager.cache.size());
-
-        stubPersistenceManager.failure = false;
-        cacheBasedPersistenceManager.scanSingleMessage(request);
-        assertTrue(myConf.readAheadCount == cacheBasedPersistenceManager.cache.size());
-
-        persistMessage(messages.get(0));
-        assertTrue(callback.isSuccess());
-
-    }
-
-    @Test(timeout=60000)
-    public void testDeliveredUntil() throws Exception {
-        for (Message m : messages) {
-            persistMessage(m);
-        }
-        assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
-        long middle = messages.size() / 2;
-        cacheBasedPersistenceManager.deliveredUntil(topic, middle);
-
-        assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
-
-        long middle2 = middle - 1;
-        cacheBasedPersistenceManager.deliveredUntil(topic, middle2);
-        // should have no effect
-        assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
-
-        // delivered all messages
-        cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size());
-        // should have no effect
-        assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
-                   .timeIndexOfAddition.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
-        assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get());
-
-    }
-
-    @Test(timeout=60000)
-    public void testDoReadAhead() {
-        StubScanCallback callback = new StubScanCallback();
-        ScanRequest request = new ScanRequest(topic, 1, callback, null);
-        cacheBasedPersistenceManager.doReadAhead(request);
-
-        assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
-
-        request = new ScanRequest(topic, myConf.readAheadCount / 2 - 1, callback, null);
-        cacheBasedPersistenceManager.doReadAhead(request);
-        assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
-
-        request = new ScanRequest(topic, myConf.readAheadCount / 2 + 2, callback, null);
-        cacheBasedPersistenceManager.doReadAhead(request);
-        assertEquals((int) (1.5 * myConf.readAheadCount), cacheBasedPersistenceManager.cache.size());
-
-    }
-
-    @Test(timeout=60000)
-    public void testReadAheadSizeLimit() throws Exception {
-        for (Message m : messages) {
-            persistMessage(m);
-        }
-        cacheBasedPersistenceManager.cache.clear();
-        StubScanCallback callback = new StubScanCallback();
-        ScanRequest request = new ScanRequest(topic, 1, callback, null);
-        cacheBasedPersistenceManager.scanSingleMessage(request);
-
-        assertTrue(callback.isSuccess());
-        assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
-                     .size());
-
-    }
-
-    @Test(timeout=60000)
-    public void testDoReadAheadStartingFrom() throws Exception {
-        persistMessage(messages.get(0));
-        int readAheadCount = 5;
-        int start = 1;
-        RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
-                                            readAheadCount);
-        assertNull(readAheadRequest);
-
-        StubScanCallback callback = new StubScanCallback();
-        int end = 100;
-        ScanRequest request = new ScanRequest(topic, end, callback, null);
-        cacheBasedPersistenceManager.doReadAhead(request);
-
-        int pos = 98;
-        readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
-        assertEquals(readAheadRequest.messageLimit, end - pos);
-
-        end = 200;
-        request = new ScanRequest(topic, end, callback, null);
-        cacheBasedPersistenceManager.doReadAhead(request);
-
-        // too far back
-        pos = 150;
-        readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
-        assertEquals(readAheadRequest.messageLimit, readAheadCount);
-    }
-
-    @Test(timeout=60000)
-    public void testAddMessageToCache() {
-        CacheKey key = new CacheKey(topic, 1);
-        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
-        assertEquals(1, cacheBasedPersistenceManager.cache.size());
-        assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
-        assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
-        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
-
-        CacheValue value = cacheBasedPersistenceManager.cache.get(key);
-        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
-                   .timeIndexOfAddition.get(value.timeOfAddition).contains(key));
-    }
-
-    @Test(timeout=60000)
-    public void testRemoveMessageFromCache() {
-        CacheKey key = new CacheKey(topic, 1);
-        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
-        cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
-        assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
-        assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
-                   .timeIndexOfAddition.isEmpty());
-    }
-
-    @Test(timeout=60000)
-    public void testCollectOldCacheEntries() {
-        int i = 1;
-        for (Message m : messages) {
-            CacheKey key = new CacheKey(topic, i);
-            cacheBasedPersistenceManager.addMessageToCache(key, m, i);
-            i++;
-        }
-
-        int n = 2;
-        myConf.maxCacheSize = n * MSG_SIZE * myConf.getNumReadAheadCacheThreads();
-        cacheBasedPersistenceManager.reloadConf(myConf);
-        cacheBasedPersistenceManager.collectOldOrExpiredCacheEntries(
-                cacheBasedPersistenceManager.cacheSegment.get());
-        assertEquals(n, cacheBasedPersistenceManager.cache.size());
-        assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
-                     .timeIndexOfAddition.size());
-    }
-
-    @Test(timeout=60000)
-    public void testCollectExpiredCacheEntries() throws Exception {
-        int i = 1;
-        int n = 2;
-        long ttl = 5000L;
-        myConf.cacheEntryTTL = ttl;
-        long curTime = MathUtils.now();
-        cacheBasedPersistenceManager.reloadConf(myConf);
-        for (Message m : messages) {
-            CacheKey key = new CacheKey(topic, i);
-            cacheBasedPersistenceManager.addMessageToCache(key, m, curTime++);
-            if (i == NUM_MESSAGES - n) {
-                Thread.sleep(2 * ttl);
-                curTime += 2 * ttl;
-            }
-            i++;
-        }
-
-        assertEquals(n, cacheBasedPersistenceManager.cache.size());
-        assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
-                     .timeIndexOfAddition.size());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
deleted file mode 100644
index 26b2ce3..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class StubSubscriptionManager extends InMemorySubscriptionManager {
-    boolean fail = false;
-
-    public void setFail(boolean fail) {
-        this.fail = fail;
-    }
-
-    public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, DeliveryManager dm,
-                                   ServerConfiguration conf, ScheduledExecutorService scheduler) {
-        super(conf, tm, pm, dm, scheduler);
-    }
-
-    @Override
-    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
-                                      Callback<SubscriptionData> callback, Object ctx) {
-        if (fail) {
-            callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
-            return;
-        }
-        super.serveSubscribeRequest(topic, subRequest, consumeSeqId, callback, ctx);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
deleted file mode 100644
index 0e0f670..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Before;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-
-public class TestMMSubscriptionManager extends ZooKeeperTestBase {
-    MetadataManagerFactory mm;
-    MMSubscriptionManager sm;
-    ServerConfiguration cfg = new ServerConfiguration();
-    SynchronousQueue<Either<SubscriptionData, PubSubException>> subDataCallbackQueue = new SynchronousQueue<Either<SubscriptionData, PubSubException>>();
-    SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>();
-
-    Callback<Void> voidCallback;
-    Callback<SubscriptionData> subDataCallback;
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        cfg = new ServerConfiguration();
-        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-        mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk);
-        sm = new MMSubscriptionManager(cfg, mm, new TrivialOwnAllTopicManager(cfg, scheduler),
-                                       LocalDBPersistenceManager.instance(), null, scheduler);
-        subDataCallback = new Callback<SubscriptionData>() {
-            @Override
-            public void operationFailed(Object ctx, final PubSubException exception) {
-                scheduler.execute(new Runnable() {
-                    public void run() {
-                        ConcurrencyUtils.put(subDataCallbackQueue, Either.of((SubscriptionData) null, exception));
-                    }
-                });
-            }
-
-            @Override
-            public void operationFinished(Object ctx, final SubscriptionData resultOfOperation) {
-                scheduler.execute(new Runnable() {
-                    public void run() {
-                        ConcurrencyUtils.put(subDataCallbackQueue, Either.of(resultOfOperation, (PubSubException) null));
-                    }
-                });
-            }
-        };
-
-        voidCallback = new Callback<Void>() {
-            @Override
-            public void operationFailed(Object ctx, final PubSubException exception) {
-                scheduler.execute(new Runnable() {
-                    public void run() {
-                        ConcurrencyUtils.put(BooleanCallbackQueue, Either.of((Boolean) null, exception));
-                    }
-                });
-            }
-
-            @Override
-            public void operationFinished(Object ctx, Void resultOfOperation) {
-                scheduler.execute(new Runnable() {
-                    public void run() {
-                        ConcurrencyUtils.put(BooleanCallbackQueue, Either.of(true, (PubSubException) null));
-                    }
-                });
-            }
-        };
-
-    }
-
-    @Test(timeout=60000)
-    public void testBasics() throws Exception {
-
-        ByteString topic1 = ByteString.copyFromUtf8("topic1");
-        ByteString sub1 = ByteString.copyFromUtf8("sub1");
-
-        //
-        // No topics acquired.
-        //
-        SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(sub1).build();
-        MessageSeqId msgId = MessageSeqId.newBuilder().setLocalComponent(100).build();
-
-        sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
-
-        Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
-                            PubSubException.ServerNotResponsibleForTopicException.class);
-
-        sm.unsubscribe(topic1, sub1, voidCallback, null);
-
-        Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
-                            PubSubException.ServerNotResponsibleForTopicException.class);
-
-        //
-        // Acquire topic.
-        //
-
-        sm.acquiredTopic(topic1, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-
-        Assert.assertTrue(sm.top2sub2seq.containsKey(topic1));
-        Assert.assertEquals(0, sm.top2sub2seq.get(topic1).size());
-
-        sm.unsubscribe(topic1, sub1, voidCallback, null);
-        Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
-                            PubSubException.ClientNotSubscribedException.class);
-
-        //
-        // Try to attach to a subscription.
-        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
-                     .build();
-
-        sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
-        Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
-                            PubSubException.ClientNotSubscribedException.class);
-
-        // now create
-        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
-                     .build();
-        sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
-        Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(subDataCallbackQueue).left().getState().getMsgId().getLocalComponent());
-        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                            .getLocalComponent());
-
-        // try to create again
-        sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
-        Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
-                            PubSubException.ClientAlreadySubscribedException.class);
-        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                            .getLocalComponent());
-
-        sm.lostTopic(topic1);
-        sm.acquiredTopic(topic1, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-
-        // try to attach
-        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
-                     .build();
-        MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
-        sm.serveSubscribeRequest(topic1, subRequest, msgId1, subDataCallback, null);
-        Assert.assertEquals(msgId.getLocalComponent(), subDataCallbackQueue.take().left().getState().getMsgId().getLocalComponent());
-        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                            .getLocalComponent());
-
-        // now manipulate the consume ptrs
-        // dont give it enough to have it persist to ZK
-        MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
-                                  msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
-        sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-        Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                            .getLocalComponent());
-        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
-                            .getLocalComponent());
-
-        // give it more so that it will write to ZK
-        MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
-                                  msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
-        sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-
-        sm.lostTopic(topic1);
-        sm.acquiredTopic(topic1, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-
-        Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
-                            .getLocalComponent());
-        Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
-                            .getLocalComponent());
-
-        // finally unsubscribe
-        sm.unsubscribe(topic1, sub1, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-
-        sm.lostTopic(topic1);
-        sm.acquiredTopic(topic1, voidCallback, null);
-        Assert.assertTrue(BooleanCallbackQueue.take().left());
-        Assert.assertFalse(sm.top2sub2seq.get(topic1).containsKey(sub1));
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
deleted file mode 100644
index d5569de..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-
-public class TestUpdateSubscriptionState extends HedwigHubTestBase {
-
-    private static final int RETENTION_SECS_VALUE = 100;
-
-    // Client side variables
-    protected HedwigClient client;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-
-    // SynchronousQueues to verify async calls
-    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-
-    // Test implementation of subscriber's message handler
-    class OrderCheckingMessageHandler implements MessageHandler {
-
-        ByteString topic;
-        ByteString subscriberId;
-        int startMsgId;
-        int numMsgs;
-        int endMsgId;
-        boolean inOrder = true;
-
-        OrderCheckingMessageHandler(ByteString topic, ByteString subscriberId,
-                                    int startMsgId, int numMsgs) {
-            this.topic = topic;
-            this.subscriberId = subscriberId;
-            this.startMsgId = startMsgId;
-            this.numMsgs = numMsgs;
-            this.endMsgId = startMsgId + numMsgs - 1;
-        }
-
-        @Override
-        public void deliver(ByteString thisTopic, ByteString thisSubscriberId,
-                            Message msg, Callback<Void> callback, Object context) {
-            if (!topic.equals(thisTopic) ||
-                !subscriberId.equals(thisSubscriberId)) {
-                return;
-            }
-            // check order
-            int msgId = Integer.parseInt(msg.getBody().toStringUtf8());
-            if (logger.isDebugEnabled()) {
-                logger.debug("Received message : " + msgId);
-            }
-
-            if (inOrder) {
-                if (startMsgId != msgId) {
-                    logger.error("Expected message " + startMsgId + ", but received message " + msgId);
-                    inOrder = false;
-                } else {
-                    ++startMsgId;
-                }
-            }
-            callback.operationFinished(context, null);
-            if (msgId == endMsgId) {
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Deliver finished!");
-                        }
-                        ConcurrencyUtils.put(queue, true);
-                    }
-                }).start();
-            }
-        }
-
-        public boolean isInOrder() {
-            return inOrder;
-        }
-    }
-
-    public TestUpdateSubscriptionState() {
-        super(1);
-    }
-
-    protected class NewHubServerConfiguration extends HubServerConfiguration {
-
-        public NewHubServerConfiguration(int serverPort, int sslServerPort) {
-            super(serverPort, sslServerPort);
-        }
-
-        @Override
-        public int getRetentionSecs() {
-            return RETENTION_SECS_VALUE;
-        }
-
-    }
-
-    @Override
-    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
-        return new NewHubServerConfiguration(serverPort, sslServerPort);
-    }
-
-    protected class TestClientConfiguration extends HubClientConfiguration {
-        @Override
-        public boolean isAutoSendConsumeMessageEnabled() {
-            return true;
-        }
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        client = new HedwigClient(new TestClientConfiguration());
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        client.close();
-        super.tearDown();
-    }
-
-    @Test(timeout=60000)
-    public void testConsumeWhenTopicRelease() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenTopicRelease");
-        ByteString subId = ByteString.copyFromUtf8("mysub");
-
-        int startMsgId = 0;
-        int numMsgs = 10;
-        // subscriber in client
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subId, opts);
-        // start delivery
-        OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler(
-                topic, subId, startMsgId, numMsgs);
-        subscriber.startDelivery(topic, subId, ocm);
-        for (int i=0; i<numMsgs; i++) {
-            Message msg = Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
-            publisher.publish(topic, msg);
-        }
-        logger.info("Publish finished.");
-        queue.take();
-        logger.info("Deliver finished.");
-        // check messages received in order
-        assertTrue(ocm.isInOrder());
-
-        // wait for retention secs
-        Thread.sleep((RETENTION_SECS_VALUE + 2) * 1000);
-
-        subscriber.stopDelivery(topic, subId);
-        subscriber.closeSubscription(topic, subId);
-
-        startMsgId = 20;
-        // reconnect it again
-        subscriber.subscribe(topic, subId, opts);
-        ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs);
-        subscriber.startDelivery(topic, subId, ocm);
-        for (int i=0; i<numMsgs; i++) {
-            Message msg = Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
-            publisher.publish(topic, msg);
-        }
-        queue.take();
-        // check messages received in order
-        assertTrue(ocm.isInOrder());
-    }
-
-    @Test(timeout=60000)
-    public void testConsumeWhenHubShutdown() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenHubShutdown");
-        ByteString subId = ByteString.copyFromUtf8("mysub");
-
-        int startMsgId = 0;
-        int numMsgs = 10;
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        // subscriber in client
-        subscriber.subscribe(topic, subId, opts);
-        // start delivery
-        OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler(
-                topic, subId, startMsgId, numMsgs);
-        subscriber.startDelivery(topic, subId, ocm);
-        for (int i=0; i<numMsgs; i++) {
-            Message msg = Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
-            publisher.publish(topic, msg);
-        }
-        logger.info("Publish finished.");
-        queue.take();
-        logger.info("Deliver finished.");
-        // check messages received in order
-        assertTrue(ocm.isInOrder());
-        // make sure consume request sent to hub server before shut down
-        Thread.sleep(2000);
-        subscriber.stopDelivery(topic, subId);
-        subscriber.closeSubscription(topic, subId);
-
-        stopHubServers();
-        Thread.sleep(1000);
-        startHubServers();
-
-        startMsgId = 20;
-        // reconnect it again
-        subscriber.subscribe(topic, subId, opts);
-        ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs);
-        subscriber.startDelivery(topic, subId, ocm);
-        for (int i=0; i<numMsgs; i++) {
-            Message msg = Message.newBuilder().setBody(
-                    ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
-            publisher.publish(topic, msg);
-        }
-        queue.take();
-        // check messages received in order
-        assertTrue(ocm.isInOrder());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
deleted file mode 100644
index b66196e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.concurrent.Executors;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class StubTopicManager extends TrivialOwnAllTopicManager {
-
-    boolean shouldOwnEveryNewTopic = false;
-    boolean shouldError = false;
-
-    public void setShouldOwnEveryNewTopic(boolean shouldOwnEveryNewTopic) {
-        this.shouldOwnEveryNewTopic = shouldOwnEveryNewTopic;
-    }
-
-    public void setShouldError(boolean shouldError) {
-        this.shouldError = shouldError;
-    }
-
-    public StubTopicManager(ServerConfiguration conf) throws UnknownHostException {
-        super(conf, Executors.newSingleThreadScheduledExecutor());
-    }
-
-    @Override
-    protected void realGetOwner(ByteString topic, boolean shouldClaim,
-                                Callback<HedwigSocketAddress> cb, Object ctx) {
-
-        if (shouldError) {
-            cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
-            return;
-        }
-        if (null != topics.getIfPresent(topic) // already own it
-                || shouldOwnEveryNewTopic) {
-            super.realGetOwner(topic, shouldClaim, cb, ctx);
-            return;
-        } else {
-            // return some other address
-            cb.operationFinished(ctx, new HedwigSocketAddress("124.31.0.1:80"));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
deleted file mode 100644
index 04fb451..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-
-public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
-
-    // Client variables
-    protected HedwigClient client;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-    
-    final LinkedBlockingQueue<ByteString> subscribers =
-            new LinkedBlockingQueue<ByteString>();
-    final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
-    final int numSubscribers = 300;
-    final AtomicInteger numDone = new AtomicInteger(0);
-
-    // SynchronousQueues to verify async calls
-    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-    
-    class SubCallback implements Callback<Void> {
-        
-        ByteString subId;
-        
-        public SubCallback(ByteString subId) {
-            this.subId = subId;
-        }
-
-        @Override
-        public void operationFinished(Object ctx,
-                Void resultOfOperation) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
-            }
-            int done = numDone.incrementAndGet();
-            if (done == numSubscribers) {
-                ConcurrencyUtils.put(queue, false);
-            }
-        }
-
-        @Override
-        public void operationFailed(Object ctx,
-                PubSubException exception) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
-            }
-            ConcurrencyUtils.put(subscribers, subId);
-            // ConcurrencyUtils.put(queue, false);
-        }
-    }
-    
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        client = new HedwigClient(new HubClientConfiguration());
-
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        // sub.interrupt();
-        // sub.join();
-        
-        client.close();
-        super.tearDown();
-    }
-    
-    @Test(timeout=60000)
-    public void testTopicAcquistion() throws Exception {
-        logger.info("Start concurrent topic acquistion test.");
-        
-        // let one bookie down to cause not enough bookie exception
-        logger.info("Tear down one bookie server.");
-        bktb.tearDownOneBookieServer();
-        
-        // In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
-        // because the tear down bookie server's zk node need time to disappear
-        // some subscriptions will create ledger successfully, then other subscriptions will fail.
-        // the race condition will be: topic manager own topic but persistence manager doesn't
-        
-        // 300 subscribers subscribe to a same topic
-        final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
-        numDone.set(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        for (int i=0; i<numSubscribers; i++) {
-            ByteString subId = ByteString.copyFromUtf8("sub-" + i);
-            if (logger.isDebugEnabled()) {
-                logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
-            }
-            subscriber.asyncSubscribe(topic, subId, opts,
-                new Callback<Void>() {
-                
-                    private void tick() {
-                        if (numDone.incrementAndGet() == numSubscribers) {
-                            ConcurrencyUtils.put(queue, true);
-                        }
-                    }
-
-                    @Override
-                    public void operationFinished(Object ctx,
-                            Void resultOfOperation) {
-                        tick();
-                    }
-
-                    @Override
-                    public void operationFailed(Object ctx,
-                            PubSubException exception) {
-                        if (exception instanceof PubSubException.ServiceDownException) {
-                            String msg = exception.getMessage();
-                            if (msg.indexOf("ServerRedirectLoopException") > 0) {
-                                inRedirectLoop.set(true);
-                            }
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("Operation failed : ", exception);
-                            }
-                        }
-                        tick(); 
-                    }
-                
-                },
-            null);
-        }
-        
-        queue.take();
-        
-        // TODO: remove comment after we fix the issue
-        // Assert.assertEquals(false, inRedirectLoop.get());
-        
-        // start a thread to send subscriptions
-        numDone.set(0);
-        Thread sub = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                logger.info("sub thread started");
-                try {
-                    // 100 subscribers subscribe to a same topic
-                    for (int i=0; i<numSubscribers; i++) {
-                        ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
-                        subscribers.put(subscriberId);
-                    }
-                    
-                    ByteString subId;
-                    SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                        .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-                    while (true) {
-                        subId = subscribers.take();
-                        
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
-                        }
-                        subscriber.asyncSubscribe(topic, subId, opts, new SubCallback(subId), null);
-                    }
-                    // subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
-                } catch (InterruptedException ie) {
-                    // break
-                    logger.warn("Interrupted : ", ie);
-                }
-            }
-
-        });
-        sub.start();
-        Thread.sleep(2000);
-        
-        // start a new bookie server
-        logger.info("start new bookie server");
-        bktb.startUpNewBookieServer();
-        
-        // hope that all the subscriptions will be OK
-        queue.take();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
deleted file mode 100644
index 77c6fad..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.topics;
-
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestHubInfo {
-
-    @Test(timeout=60000)
-    public void testParseHubInfo() throws Exception {
-        HedwigSocketAddress addr = new HedwigSocketAddress("localhost", 9086, 9087);
-        HubInfo hubInfo1 = new HubInfo(addr, 9999);
-
-        String strHubInfo1 = hubInfo1.toString();
-        HubInfo parsedHubInfo1 = HubInfo.parse(strHubInfo1);
-        Assert.assertEquals("Hub infos should be same", hubInfo1, parsedHubInfo1);
-
-        HubInfo hubInfo2 = new HubInfo(addr, 0);
-        HubInfo parsedHubInfo2 = HubInfo.parse("localhost:9086:9087");
-        Assert.assertEquals("Hub infos w/o zxid should be same", hubInfo2, parsedHubInfo2);
-
-        // parse empty string
-        try {
-            HubInfo.parse("");
-            Assert.fail("Should throw InvalidHubInfoException parsing empty string.");
-        } catch (HubInfo.InvalidHubInfoException ihie) {
-        }
-
-        // parse corrupted hostname
-        try {
-            HubInfo.parse("localhost,a,b,c");
-            Assert.fail("Should throw InvalidHubInfoException parsing corrupted hostname.");
-        } catch (HubInfo.InvalidHubInfoException ihie) {
-        }
-
-        // parse corrupted string
-        try {
-            HubInfo.parse("hostname: localhost:9086:9087");
-            Assert.fail("Should throw InvalidHubInfoException parsing corrupted string.");
-        } catch (HubInfo.InvalidHubInfoException ihie) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
deleted file mode 100644
index f14d601..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.topics;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestHubLoad {
-
-    @Test(timeout=60000)
-    public void testParseHubLoad() throws Exception {
-        HubLoad hubLoad1 = new HubLoad(9999);
-
-        String strHubLoad1 = hubLoad1.toString();
-        HubLoad parsedHubLoad1 = HubLoad.parse(strHubLoad1);
-        Assert.assertEquals("Hub load data should be same", hubLoad1, parsedHubLoad1);
-
-        final int numTopics = 9998;
-        HubLoad hubLoad2 = new HubLoad(numTopics);
-        HubLoad parsedHubLoad2 = HubLoad.parse(numTopics + "");
-        Assert.assertEquals("Hub load data not protobuf encoded should be same", hubLoad2, parsedHubLoad2);
-
-        // parse empty string
-        try {
-            HubLoad.parse("");
-            Assert.fail("Should throw InvalidHubLoadException parsing empty string.");
-        } catch (HubLoad.InvalidHubLoadException ihie) {
-        }
-
-        // parse corrupted numTopics
-        try {
-            HubLoad.parse("9998_x");
-            Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
-        } catch (HubLoad.InvalidHubLoadException ihie) {
-        }
-
-        // parse corrupted string
-        try {
-            HubLoad.parse("hostname: 9998_x");
-            Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
-        } catch (HubLoad.InvalidHubLoadException ihie) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
deleted file mode 100644
index c75ff05..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.CompositeException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactoryTestCase;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestMMTopicManager extends MetadataManagerFactoryTestCase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TestMMTopicManager.class);
-
-    protected MMTopicManager tm;
-    protected TopicOwnershipManager tom;
-
-    protected class CallbackQueue<T> implements Callback<T> {
-        SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
-
-        public SynchronousQueue<Either<T, Exception>> getQueue() {
-            return q;
-        }
-
-        public Either<T, Exception> take() throws InterruptedException {
-            return q.take();
-        }
-
-        @Override
-        public void operationFailed(Object ctx, final PubSubException exception) {
-            LOG.error("got exception: " + exception);
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
-                }
-            }).start();
-        }
-
-        @Override
-        public void operationFinished(Object ctx, final T resultOfOperation) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
-                }
-            }).start();
-        }
-    }
-
-    protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
-    protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
-    protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
-
-    protected ByteString topic = ByteString.copyFromUtf8("topic");
-    protected HedwigSocketAddress me;
-    protected ScheduledExecutorService scheduler;
-
-    public TestMMTopicManager(String metaManagerCls) {
-        super(metaManagerCls);
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        me = conf.getServerAddr();
-        scheduler = Executors.newSingleThreadScheduledExecutor();
-        tom = metadataManagerFactory.newTopicOwnershipManager();
-        tm = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        tom.close();
-        tm.stop();
-        super.tearDown();
-    }
-
-    @Test(timeout=60000)
-    public void testGetOwnerSingle() throws Exception {
-        tm.getOwner(topic, false, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-    }
-
-    protected ByteString mkTopic(int i) {
-        return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
-    }
-
-    protected <T> T check(Either<T, Exception> ex) throws Exception {
-        if (ex.left() == null)
-            throw ex.right();
-        else
-            return ex.left();
-    }
-
-    public static class CustomServerConfiguration extends ServerConfiguration {
-        int port;
-
-        public CustomServerConfiguration(int port) {
-            this.port = port;
-        }
-
-        @Override
-        public int getServerPort() {
-            return port;
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testGetOwnerMulti() throws Exception {
-        ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1),
-                            conf2 = new CustomServerConfiguration(conf.getServerPort() + 2);
-        MMTopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler),
-                       tm2 = new MMTopicManager(conf2, zk, metadataManagerFactory, scheduler);
-
-        tm.getOwner(topic, false, addrCbq, null);
-        HedwigSocketAddress owner = check(addrCbq.take());
-
-        for (int i = 0; i < 100; ++i) {
-            tm.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
-            tm1.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
-            tm2.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-        }
-
-        for (int i = 0; i < 100; ++i) {
-            if (!owner.equals(me))
-                break;
-            tm.getOwner(mkTopic(i), false, addrCbq, null);
-            owner = check(addrCbq.take());
-            if (i == 99)
-                Assert.fail("Never chose another owner");
-        }
-
-        tm1.stop();
-        tm2.stop();
-    }
-
-    @Test(timeout=60000)
-    public void testLoadBalancing() throws Exception {
-        tm.getOwner(topic, false, addrCbq, null);
-
-        Assert.assertEquals(me, check(addrCbq.take()));
-
-        ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1);
-        TopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler);
-
-        ByteString topic1 = mkTopic(1);
-        tm.getOwner(topic1, false, addrCbq, null);
-        Assert.assertEquals(conf1.getServerAddr(), check(addrCbq.take()));
-
-        tm1.stop();
-    }
-
-    class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
-        boolean failure;
-        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
-
-        public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) {
-            this.bsQueue = bsQueue;
-        }
-
-        public void setFailure(boolean failure) {
-            this.failure = failure;
-        }
-
-        @Override
-        public void lostTopic(final ByteString topic) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
-                }
-            }).start();
-        }
-
-        public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
-                    if (failure) {
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
-                    } else {
-                        callback.operationFinished(ctx, null);
-                    }
-                }
-            }).start();
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testOwnershipChange() throws Exception {
-        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>();
-
-        StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
-
-        tm.addTopicOwnershipChangeListener(listener);
-
-        // regular acquire
-        tm.getOwner(topic, true, addrCbq, null);
-        Pair<ByteString, Boolean> pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertTrue(pair.second());
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-
-        // topic that I already own
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Assert.assertTrue(bsQueue.isEmpty());
-        assertOwnershipNodeExists();
-
-        // regular release
-        tm.releaseTopic(topic, cb, null);
-        pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertFalse(pair.second());
-        Assert.assertTrue(queue.take());
-        assertOwnershipNodeDoesntExist();
-
-        // releasing topic that I don't own
-        tm.releaseTopic(mkTopic(0), cb, null);
-        Assert.assertTrue(queue.take());
-        Assert.assertTrue(bsQueue.isEmpty());
-
-        // set listener to return error
-        listener.setFailure(true);
-
-        tm.getOwner(topic, true, addrCbq, null);
-        pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertTrue(pair.second());
-        Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
-                            .getExceptions().iterator().next().getClass());
-        Assert.assertFalse(null != tm.topics.getIfPresent(topic));
-        Thread.sleep(100);
-        assertOwnershipNodeDoesntExist();
-
-    }
-
-    public void assertOwnershipNodeExists() throws Exception {
-        StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
-        tom.readOwnerInfo(topic, callback, null);
-        Versioned<HubInfo> hubInfo = callback.queue.take().left();
-        Assert.assertEquals(tm.addr, hubInfo.getValue().getAddress());
-    }
-
-    public void assertOwnershipNodeDoesntExist() throws Exception {
-        StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
-        tom.readOwnerInfo(topic, callback, null);
-        Versioned<HubInfo> hubInfo = callback.queue.take().left();
-        Assert.assertEquals(null, hubInfo);
-    }
-
-    @Test(timeout=60000)
-    public void testZKClientDisconnected() throws Exception {
-        // First assert ownership of the topic
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-
-        // Suspend the ZKTopicManager and make sure calls to getOwner error out
-        tm.isSuspended = true;
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
-        // Release the topic. This should not error out even if suspended.
-        tm.releaseTopic(topic, cb, null);
-        Assert.assertTrue(queue.take());
-        assertOwnershipNodeDoesntExist();
-
-        // Restart the ZKTopicManager and make sure calls to getOwner are okay
-        tm.isSuspended = false;
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-    }
-
-    @Test(timeout=60000)
-    public void testRetentionAfterAccess() throws Exception {
-        conf.getConf().setProperty("retention_secs_after_access", "5");
-        MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(6000L);
-        tm1.topics.cleanUp();
-        Thread.sleep(2000L);
-        assertOwnershipNodeDoesntExist();
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(1000L);
-        tm1.topics.cleanUp();
-        Thread.sleep(2000L);
-        assertOwnershipNodeExists();
-
-        tm1.stop();
-    }
-
-    @Test(timeout=60000)
-    public void testMaxNumTopics() throws Exception {
-        conf.getConf().setProperty("max_num_topics", "1");
-        MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-        tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
-                     true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(2000L);
-        assertOwnershipNodeDoesntExist();
-        tm1.stop();
-    }
-
-
-}