You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:17 UTC

[07/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java b/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
deleted file mode 100644
index e93ecae..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-
-import static org.junit.Assert.*;
-
-public class TestMultiplexing extends HedwigHubTestBase {
-
-    private static final int DEFAULT_MSG_WINDOW_SIZE = 10;
-
-    protected class TestServerConfiguration extends HubServerConfiguration {
-        TestServerConfiguration(int serverPort, int sslServerPort) {
-            super(serverPort, sslServerPort);
-        }
-        @Override
-        public int getDefaultMessageWindowSize() {
-            return DEFAULT_MSG_WINDOW_SIZE;
-        }
-    }
-
-    class TestMessageHandler implements MessageHandler {
-
-        int expected;
-        final int numMsgsAtFirstRun;
-        final int numMsgsAtSecondRun;
-        final CountDownLatch firstLatch;
-        final CountDownLatch secondLatch;
-        final boolean receiveSecondRun;
-
-        public TestMessageHandler(int start, int numMsgsAtFirstRun,
-                                  boolean receiveSecondRun,
-                                  int numMsgsAtSecondRun) {
-            expected = start;
-            this.numMsgsAtFirstRun = numMsgsAtFirstRun;
-            this.numMsgsAtSecondRun = numMsgsAtSecondRun;
-            this.receiveSecondRun = receiveSecondRun;
-            firstLatch = new CountDownLatch(1);
-            secondLatch = new CountDownLatch(1);
-        }
-
-        @Override
-        public synchronized void deliver(ByteString topic, ByteString subscriberId,
-                                         Message msg,
-                                         Callback<Void> callback, Object context) {
-            try {
-                int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                logger.debug("Received message {}.", value);
-
-                if (value == expected) {
-                    ++expected;
-                } else {
-                    // error condition
-                    logger.error("Did not receive expected value, expected {}, got {}",
-                                 expected, value);
-                    expected = 0;
-                    firstLatch.countDown();
-                    secondLatch.countDown();
-                }
-                if (numMsgsAtFirstRun + 1 == expected) {
-                    firstLatch.countDown();
-                }
-                if (receiveSecondRun) {
-                    if (numMsgsAtFirstRun + numMsgsAtSecondRun + 1 == expected) {
-                        secondLatch.countDown();
-                    }
-                } else {
-                    if (numMsgsAtFirstRun + 1 < expected) {
-                        secondLatch.countDown();
-                    }
-                }
-                callback.operationFinished(context, null);
-                subscriber.consume(topic, subscriberId, msg.getMsgId());
-            } catch (Throwable t) {
-                logger.error("Received bad message.", t);
-                firstLatch.countDown();
-                secondLatch.countDown();
-            }
-        }
-
-        public void checkFirstRun() throws Exception {
-            assertTrue("Timed out waiting for messages " + (numMsgsAtFirstRun + 1),
-                       firstLatch.await(10, TimeUnit.SECONDS));
-            assertEquals("Should be expected messages with " + (numMsgsAtFirstRun + 1),
-                         numMsgsAtFirstRun + 1, expected);
-        }
-
-        public void checkSecondRun() throws Exception {
-            if (receiveSecondRun) {
-                assertTrue("Timed out waiting for messages "
-                           + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
-                           secondLatch.await(10, TimeUnit.SECONDS));
-                assertEquals("Should be expected messages with "
-                             + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
-                             numMsgsAtFirstRun + numMsgsAtSecondRun + 1, expected);
-            } else {
-                assertFalse("Receive more messages than " + numMsgsAtFirstRun,
-                            secondLatch.await(3, TimeUnit.SECONDS));
-                assertEquals("Should be expected messages with ony " + (numMsgsAtFirstRun + 1),
-                             numMsgsAtFirstRun + 1, expected);
-            }
-        }
-    }
-
-    class ThrottleMessageHandler implements MessageHandler {
-
-        int expected;
-        final int numMsgs;
-        final int numMsgsThrottle;
-        final CountDownLatch throttleLatch;
-        final CountDownLatch nonThrottleLatch;
-        final boolean enableThrottle;
-
-        public ThrottleMessageHandler(int start, int numMsgs,
-                                      boolean enableThrottle,
-                                      int numMsgsThrottle) {
-            expected = start;
-            this.numMsgs = numMsgs;
-            this.enableThrottle = enableThrottle;
-            this.numMsgsThrottle = numMsgsThrottle;
-            throttleLatch = new CountDownLatch(1);
-            nonThrottleLatch = new CountDownLatch(1);
-        }
-
-        @Override
-        public synchronized void deliver(ByteString topic, ByteString subscriberId,
-                                         Message msg,
-                                         Callback<Void> callback, Object context) {
-            try {
-                int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                logger.debug("Received message {}.", value);
-
-                if (value == expected) {
-                    ++expected;
-                } else {
-                    // error condition
-                    logger.error("Did not receive expected value, expected {}, got {}",
-                                 expected, value);
-                    expected = 0;
-                    throttleLatch.countDown();
-                    nonThrottleLatch.countDown();
-                }
-                if (expected == numMsgsThrottle + 2) {
-                    throttleLatch.countDown();
-                }
-                if (expected == numMsgs + 1) {
-                    nonThrottleLatch.countDown();
-                }
-                callback.operationFinished(context, null);
-                if (enableThrottle) {
-                    if (expected > numMsgsThrottle + 1) {
-                        subscriber.consume(topic, subscriberId, msg.getMsgId());
-                    }
-                } else {
-                    subscriber.consume(topic, subscriberId, msg.getMsgId());
-                }
-            } catch (Throwable t) {
-                logger.error("Received bad message.", t);
-                throttleLatch.countDown();
-                nonThrottleLatch.countDown();
-            }
-        }
-
-        public void checkThrottle() throws Exception {
-            if (enableThrottle) {
-                assertFalse("Received more messages than throttle value " + numMsgsThrottle,
-                            throttleLatch.await(3, TimeUnit.SECONDS));
-                assertEquals("Should be expected messages with only " + (numMsgsThrottle + 1),
-                             numMsgsThrottle + 1, expected);
-            } else {
-                assertTrue("Should not be throttled.", throttleLatch.await(10, TimeUnit.SECONDS));
-                assertTrue("Timed out waiting for messages " + (numMsgs + 1),
-                           nonThrottleLatch.await(10, TimeUnit.SECONDS));
-                assertEquals("Should be expected messages with " + (numMsgs + 1),
-                             numMsgs + 1, expected);
-            }
-        }
-
-        public void checkAfterThrottle() throws Exception {
-            if (enableThrottle) {
-                assertTrue("Timed out waiting for messages " + (numMsgs + 1),
-                           nonThrottleLatch.await(10, TimeUnit.SECONDS));
-                assertEquals("Should be expected messages with " + (numMsgs + 1),
-                             numMsgs + 1, expected);
-            }
-        }
-    }
-
-    HedwigClient client;
-    Publisher publisher;
-    Subscriber subscriber;
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        client = new HedwigClient(new HubClientConfiguration() {
-            @Override
-            public boolean isSubscriptionChannelSharingEnabled() {
-                return true;
-            }
-            @Override
-            public boolean isAutoSendConsumeMessageEnabled() {
-                return false;
-            }
-        });
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        client.close();
-        super.tearDown();
-    }
-
-    @Override
-    protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
-        return new TestServerConfiguration(port, sslPort);
-    }
-
-    @Test(timeout=60000)
-    public void testStopDelivery() throws Exception {
-        ByteString topic1 = ByteString.copyFromUtf8("testStopDelivery-1");
-        ByteString topic2 = ByteString.copyFromUtf8("testStopDelivery-2");
-        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
-        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
-        final int X = 20;
-
-        TestMessageHandler csHandler11 =
-            new TestMessageHandler(1, X, true, X);
-        TestMessageHandler csHandler12 =
-            new TestMessageHandler(1, X, false, 0);
-        TestMessageHandler csHandler21 =
-            new TestMessageHandler(1, X, false, 0);
-        TestMessageHandler csHandler22 =
-            new TestMessageHandler(1, X, true, X);
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
-        subscriber.subscribe(topic1, subid1, opts);
-        subscriber.subscribe(topic1, subid2, opts);
-        subscriber.subscribe(topic2, subid1, opts);
-        subscriber.subscribe(topic2, subid2, opts);
-
-        // start deliveries
-        subscriber.startDelivery(topic1, subid1, csHandler11);
-        subscriber.startDelivery(topic1, subid2, csHandler12);
-        subscriber.startDelivery(topic2, subid1, csHandler21);
-        subscriber.startDelivery(topic2, subid2, csHandler22);
-
-        // first publish
-        for (int i = 1; i<=X; i++) {
-            publisher.publish(topic1, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-            publisher.publish(topic2, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-
-        csHandler11.checkFirstRun();
-        csHandler12.checkFirstRun();
-        csHandler21.checkFirstRun();
-        csHandler22.checkFirstRun();
-
-        // stop delivery for <topic1, subscriber2> and <topic2, subscriber1>
-        subscriber.stopDelivery(topic1, subid2);
-        subscriber.stopDelivery(topic2, subid1);
-
-        // second publish
-        for (int i = X+1; i<=2*X; i++) {
-            publisher.publish(topic1, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-            publisher.publish(topic2, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-
-        csHandler11.checkSecondRun();
-        csHandler22.checkSecondRun();
-        csHandler12.checkSecondRun();
-        csHandler21.checkSecondRun();
-    }
-
-    @Test(timeout=60000)
-    public void testCloseSubscription() throws Exception {
-        ByteString topic1 = ByteString.copyFromUtf8("testCloseSubscription-1");
-        ByteString topic2 = ByteString.copyFromUtf8("testCloseSubscription-2");
-        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
-        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
-        final int X = 20;
-
-        TestMessageHandler csHandler11 =
-            new TestMessageHandler(1, X, true, X);
-        TestMessageHandler csHandler12 =
-            new TestMessageHandler(1, X, false, 0);
-        TestMessageHandler csHandler21 =
-            new TestMessageHandler(1, X, false, 0);
-        TestMessageHandler csHandler22 =
-            new TestMessageHandler(1, X, true, X);
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-        subscriber.subscribe(topic1, subid1, opts);
-        subscriber.subscribe(topic1, subid2, opts);
-        subscriber.subscribe(topic2, subid1, opts);
-        subscriber.subscribe(topic2, subid2, opts);
-
-        // start deliveries
-        subscriber.startDelivery(topic1, subid1, csHandler11);
-        subscriber.startDelivery(topic1, subid2, csHandler12);
-        subscriber.startDelivery(topic2, subid1, csHandler21);
-        subscriber.startDelivery(topic2, subid2, csHandler22);
-
-        // first publish
-        for (int i = 1; i<=X; i++) {
-            publisher.publish(topic1, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-            publisher.publish(topic2, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-
-        csHandler11.checkFirstRun();
-        csHandler12.checkFirstRun();
-        csHandler21.checkFirstRun();
-        csHandler22.checkFirstRun();
-
-        // close subscription for <topic1, subscriber2> and <topic2, subscriber1>
-        subscriber.closeSubscription(topic1, subid2);
-        subscriber.closeSubscription(topic2, subid1);
-
-        // second publish
-        for (int i = X+1; i<=2*X; i++) {
-            publisher.publish(topic1, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-            publisher.publish(topic2, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-
-        csHandler11.checkSecondRun();
-        csHandler22.checkSecondRun();
-        csHandler12.checkSecondRun();
-        csHandler21.checkSecondRun();
-    }
-
-    @Test(timeout=60000)
-    public void testThrottle() throws Exception {
-        ByteString topic1 = ByteString.copyFromUtf8("testThrottle-1");
-        ByteString topic2 = ByteString.copyFromUtf8("testThrottle-2");
-        ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
-        ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
-        final int X = DEFAULT_MSG_WINDOW_SIZE;
-
-        ThrottleMessageHandler csHandler11 =
-            new ThrottleMessageHandler(1, 3*X, false, X);
-        ThrottleMessageHandler csHandler12 =
-            new ThrottleMessageHandler(1, 3*X, true, X);
-        ThrottleMessageHandler csHandler21 =
-            new ThrottleMessageHandler(1, 3*X, true, X);
-        ThrottleMessageHandler csHandler22 =
-            new ThrottleMessageHandler(1, 3*X, false, X);
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-        subscriber.subscribe(topic1, subid1, opts);
-        subscriber.subscribe(topic1, subid2, opts);
-        subscriber.subscribe(topic2, subid1, opts);
-        subscriber.subscribe(topic2, subid2, opts);
-
-        // start deliveries
-        subscriber.startDelivery(topic1, subid1, csHandler11);
-        subscriber.startDelivery(topic1, subid2, csHandler12);
-        subscriber.startDelivery(topic2, subid1, csHandler21);
-        subscriber.startDelivery(topic2, subid2, csHandler22);
-
-        // publish
-        for (int i = 1; i<=3*X; i++) {
-            publisher.publish(topic1, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-            publisher.publish(topic2, Message.newBuilder().setBody(
-                                      ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-
-        csHandler11.checkThrottle();
-        csHandler12.checkThrottle();
-        csHandler21.checkThrottle();
-        csHandler22.checkThrottle();
-
-        // consume messages to not throttle them
-        for (int i=1; i<=X; i++) {
-            MessageSeqId seqId =
-                MessageSeqId.newBuilder().setLocalComponent(i).build();
-            subscriber.consume(topic1, subid2, seqId);
-            subscriber.consume(topic2, subid1, seqId);
-        }
-
-        csHandler11.checkAfterThrottle();
-        csHandler22.checkAfterThrottle();
-        csHandler12.checkAfterThrottle();
-        csHandler21.checkAfterThrottle();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
deleted file mode 100644
index 81e0314..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.persistence.BookKeeperTestBase;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.bookkeeper.test.PortManager;
-
-/**
- * This is a base class for any tests that need a Hedwig Hub(s) setup with an
- * associated BookKeeper and ZooKeeper instance.
- *
- */
-public abstract class HedwigHubTestBase extends TestCase {
-
-    protected static final Logger logger = LoggerFactory.getLogger(HedwigHubTestBase.class);
-
-    // BookKeeper variables
-    // Default number of bookie servers to setup. Extending classes can
-    // override this.
-    protected int numBookies = 3;
-    protected long readDelay = 0L;
-    protected BookKeeperTestBase bktb;
-
-    // PubSubServer variables
-    // Default number of PubSubServer hubs to setup. Extending classes can
-    // override this.
-    protected final int numServers;
-    protected List<PubSubServer> serversList;
-    protected List<HedwigSocketAddress> serverAddresses;
-
-    protected boolean sslEnabled = true;
-    protected boolean standalone = false;
-
-    protected static final String HOST = "localhost";
-
-    public HedwigHubTestBase() {
-        this(1);
-    }
-
-    protected HedwigHubTestBase(int numServers) {
-        this.numServers = numServers;
-
-        init();
-    }
-
-    public HedwigHubTestBase(String name, int numServers) {
-        this.numServers = numServers;
-        init();
-    }
-
-    private void init() {
-
-        serverAddresses = new LinkedList<HedwigSocketAddress>();
-        for (int i = 0; i < numServers; i++) {
-            serverAddresses.add(new HedwigSocketAddress(HOST,
-                                        PortManager.nextFreePort(), PortManager.nextFreePort()));
-        }
-    }
-
-    // Default child class of the ServerConfiguration to be used here.
-    // Extending classes can define their own (possibly extending from this) and
-    // override the getServerConfiguration method below to return their own
-    // configuration.
-    protected class HubServerConfiguration extends ServerConfiguration {
-        private final int serverPort, sslServerPort;
-
-        public HubServerConfiguration(int serverPort, int sslServerPort) {
-            this.serverPort = serverPort;
-            this.sslServerPort = sslServerPort;
-        }
-
-        @Override
-        public boolean isStandalone() {
-            return standalone;
-        }
-
-        @Override
-        public int getServerPort() {
-            return serverPort;
-        }
-
-        @Override
-        public int getSSLServerPort() {
-            return sslServerPort;
-        }
-
-        @Override
-        public String getZkHost() {
-            return null != bktb ? bktb.getZkHostPort() : null;
-        }
-
-        @Override
-        public boolean isSSLEnabled() {
-            return sslEnabled;
-        }
-
-        @Override
-        public String getCertName() {
-            return isSSLEnabled() ? "/server.p12" : null;
-        }
-
-        @Override
-        public String getPassword() {
-            return isSSLEnabled() ? "eUySvp2phM2Wk" : null;
-        }
-    }
-
-    public class HubClientConfiguration extends ClientConfiguration {
-        @Override
-        public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-            return serverAddresses.get(0);
-        }
-    }
-
-    // Method to get a ServerConfiguration for the PubSubServers created using
-    // the specified ports. Extending child classes can override this. This
-    // default implementation will return the HubServerConfiguration object
-    // defined above.
-    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
-        return new HubServerConfiguration(serverPort, sslServerPort);
-    }
-
-    protected void startHubServers() throws Exception {
-        // Now create the PubSubServer Hubs
-        serversList = new LinkedList<PubSubServer>();
-
-        for (int i = 0; i < numServers; i++) {
-            ServerConfiguration conf = getServerConfiguration(serverAddresses.get(i).getPort(),
-                                                              sslEnabled ? serverAddresses.get(i).getSSLPort() : -1);
-            PubSubServer s = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler());
-            serversList.add(s);
-            s.start();
-        }
-    }
-
-    protected void stopHubServers() throws Exception {
-        // Shutdown all of the PubSubServers
-        for (PubSubServer server : serversList) {
-            server.shutdown();
-        }
-        serversList.clear();
-    }
-
-    @Before
-    protected void setUp() throws Exception {
-        logger.info("STARTING " + getClass());
-        if (! standalone) {
-            bktb = new BookKeeperTestBase(numBookies, readDelay);
-            bktb.setUp();
-        }
-        startHubServers();
-        logger.info("HedwigHub test setup finished");
-    }
-
-    @After
-    protected void tearDown() throws Exception {
-        logger.info("tearDown starting");
-        stopHubServers();
-        if (null != bktb) bktb.tearDown();
-        logger.info("FINISHED " + getClass());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
deleted file mode 100644
index 0a574b6..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.persistence.BookKeeperTestBase;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.bookkeeper.test.PortManager;
-
-/**
- * This is a base class for any tests that need a Hedwig Region(s) setup with a
- * number of Hedwig hubs per region, an associated HedwigClient per region and
- * the required BookKeeper and ZooKeeper instances.
- *
- */
-public abstract class HedwigRegionTestBase {
-
-    protected static final Logger logger = LoggerFactory.getLogger(HedwigRegionTestBase.class);
-
-    // BookKeeper variables
-    // Default number of bookie servers to setup. Extending classes
-    // can override this. We should be able to reuse the same BookKeeper
-    // ensemble among all of the regions, at least for unit testing purposes.
-    protected int numBookies = 3;
-    protected BookKeeperTestBase bktb;
-
-    // Hedwig Region variables
-    // Default number of Hedwig Regions to setup. Extending classes can
-    // override this.
-    protected int numRegions = 2;
-    protected int numServersPerRegion = 1;
-
-    // Map with keys being Region names and values being the list of Hedwig
-    // Hubs (PubSubServers) for that particular region.
-    protected Map<String, List<PubSubServer>> regionServersMap;
-    // Map with keys being Region names and values being the Hedwig Client
-    // instance.
-    protected Map<String, HedwigClient> regionClientsMap;
-
-    protected Map<String, Integer> regionNameToIndexMap;
-    protected Map<Integer, List<HedwigSocketAddress>> regionHubAddresses;
-
-    // String constant used as the prefix for the region names.
-    protected static final String REGION_PREFIX = "region";
-
-    // Default child class of the ServerConfiguration to be used here.
-    // Extending classes can define their own (possibly extending from this) and
-    // override the getServerConfiguration method below to return their own
-    // configuration.
-    protected class RegionServerConfiguration extends ServerConfiguration {
-        private final int serverPort, sslServerPort;
-        private final String regionName;
-
-        public RegionServerConfiguration(int serverPort, int sslServerPort, String regionName) {
-            this.serverPort = serverPort;
-            this.sslServerPort = sslServerPort;
-            this.regionName = regionName;
-            conf.setProperty(REGION, regionName);
-            setRegionList();
-        }
-
-        protected void setRegionList() {
-            List<String> myRegionList = new LinkedList<String>();
-            for (int i = 0; i < numRegions; i++) {
-                int curDefaultServerPort = regionHubAddresses.get(i).get(0).getPort();
-                int curDefaultSSLServerPort = regionHubAddresses.get(i).get(0).getSSLPort();
-                // Add this region default server port if it is for a region
-                // other than its own.
-                if (regionNameToIndexMap.get(regionName) != i) {
-                    myRegionList.add("localhost:" + curDefaultServerPort + ":" + curDefaultSSLServerPort);
-                }
-            }
-
-            regionList = myRegionList;
-        }
-
-        @Override
-        public int getServerPort() {
-            return serverPort;
-        }
-
-        @Override
-        public int getSSLServerPort() {
-            return sslServerPort;
-        }
-
-        @Override
-        public String getZkHost() {
-            return bktb.getZkHostPort();
-        }
-
-        @Override
-        public String getMyRegion() {
-            return regionName;
-        }
-
-        @Override
-        public boolean isSSLEnabled() {
-            return true;
-        }
-
-        @Override
-        public boolean isInterRegionSSLEnabled() {
-            return true;
-        }
-
-        @Override
-        public String getCertName() {
-            return "/server.p12";
-        }
-
-        @Override
-        public String getPassword() {
-            return "eUySvp2phM2Wk";
-        }
-    }
-
-    // Method to get a ServerConfiguration for the PubSubServers created using
-    // the specified ports and region name. Extending child classes can override
-    // this. This default implementation will return the
-    // RegionServerConfiguration object defined above.
-    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort, String regionName) {
-        return new RegionServerConfiguration(serverPort, sslServerPort, regionName);
-    }
-
-    // Default ClientConfiguration to use. This just points to the first
-    // Hedwig hub server in each region as the "default server host" to connect
-    // to.
-    protected class RegionClientConfiguration extends ClientConfiguration {
-        public RegionClientConfiguration(int serverPort, int sslServerPort) {
-            myDefaultServerAddress = new HedwigSocketAddress("localhost:" + serverPort + ":" + sslServerPort);
-        }
-        // Below you can override any of the default ClientConfiguration
-        // parameters if needed.
-    }
-
-    // Method to get a ClientConfiguration for the HedwigClients created.
-    // Inputs are the default Hedwig hub server's ports to point to.
-    protected ClientConfiguration getClientConfiguration(int serverPort, int sslServerPort) {
-        return new RegionClientConfiguration(serverPort, sslServerPort);
-    }
-
-    // Method to get a ClientConfiguration for the Cross Region Hedwig Client.
-    protected ClientConfiguration getRegionClientConfiguration() {
-        return new ClientConfiguration() {
-            @Override
-            public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-                return regionHubAddresses.get(0).get(0);
-            }
-        };
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        logger.info("STARTING " + getClass());
-        bktb = new BookKeeperTestBase(numBookies);
-        bktb.setUp();
-
-        // Create the Hedwig PubSubServer Hubs for all of the regions
-        regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f);
-        regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);
-
-        regionHubAddresses = new HashMap<Integer, List<HedwigSocketAddress>>(numRegions, 1.0f);
-        for (int i = 0; i < numRegions; i++) {
-            List<HedwigSocketAddress> addresses = new LinkedList<HedwigSocketAddress>();
-            for (int j = 0; j < numServersPerRegion; j++) {
-                HedwigSocketAddress a = new HedwigSocketAddress("localhost",
-                        PortManager.nextFreePort(), PortManager.nextFreePort());
-                addresses.add(a);
-            }
-            regionHubAddresses.put(i, addresses);
-        }
-        regionNameToIndexMap = new HashMap<String, Integer>();
-
-        for (int i = 0; i < numRegions; i++) {
-            startRegion(i);
-        }
-        logger.info("HedwigRegion test setup finished");
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        logger.info("tearDown starting");
-        // Stop all of the HedwigClients for all regions
-        for (HedwigClient client : regionClientsMap.values()) {
-            client.close();
-        }
-        regionClientsMap.clear();
-        // Shutdown all of the PubSubServers in all regions
-        for (List<PubSubServer> serversList : regionServersMap.values()) {
-            for (PubSubServer server : serversList) {
-                server.shutdown();
-            }
-        }
-        logger.info("Finished shutting down all of the hub servers!");
-        regionServersMap.clear();
-        // Shutdown the BookKeeper and ZooKeeper stuff
-        bktb.tearDown();
-        logger.info("FINISHED " + getClass());
-    }
-
-    protected void stopRegion(int regionIdx) throws Exception {
-        String regionName = REGION_PREFIX + regionIdx;
-        if (logger.isDebugEnabled()) {
-            logger.debug("Stop region : " + regionName);
-        }
-        HedwigClient regionClient = regionClientsMap.remove(regionName);
-        if (null != regionClient) {
-            regionClient.close();
-        }
-        List<PubSubServer> serversList = regionServersMap.remove(regionName);
-        if (null == serversList) {
-            return;
-        }
-        for (PubSubServer server : serversList) {
-            server.shutdown();
-        }
-        logger.info("Finished shutting down all of the hub servers in region " + regionName);
-    }
-
-    protected void startRegion(int i) throws Exception {
-        String regionName = REGION_PREFIX + i;
-        regionNameToIndexMap.put(regionName, i);
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Start region : " + regionName);
-        }
-
-        List<PubSubServer> serversList = new LinkedList<PubSubServer>();
-        // For the current region, create the necessary amount of hub
-        // servers. We will basically increment through the port numbers
-        // starting from the initial ones defined.
-        for (int j = 0; j < numServersPerRegion; j++) {
-            HedwigSocketAddress a = regionHubAddresses.get(i).get(j);
-            PubSubServer s = new PubSubServer(
-                    getServerConfiguration(a.getPort(),
-                                           a.getSSLPort(),
-                                           regionName),
-                    getRegionClientConfiguration(),
-                    new LoggingExceptionHandler());
-            serversList.add(s);
-            s.start();
-        }
-        // Store this list of servers created for the current region
-        regionServersMap.put(regionName, serversList);
-        // Create a Hedwig Client that points to the first Hub server
-        // created in the loop above for the current region.
-        HedwigClient regionClient = new HedwigClient(
-                getClientConfiguration(regionHubAddresses.get(i).get(0).getPort(),
-                                       regionHubAddresses.get(i).get(0).getSSLPort()));
-        regionClientsMap.put(regionName, regionClient);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
deleted file mode 100644
index 5ea0990..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Exception handler that simply logs the exception and
- * does nothing more. To be used in tests instead of TerminateJVMExceptionHandler
- */
-public class LoggingExceptionHandler implements Thread.UncaughtExceptionHandler {
-    private static final Logger logger = LoggerFactory.getLogger(LoggingExceptionHandler.class);
-
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-        logger.error("Uncaught exception in thread " + t.getName(), e);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
deleted file mode 100644
index 02d1f46..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.bookkeeper.test.PortManager;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-/**
- * This is a base class for any tests that need a StandAlone PubSubServer setup.
- */
-public abstract class PubSubServerStandAloneTestBase {
-
-    protected static final Logger logger = LoggerFactory.getLogger(PubSubServerStandAloneTestBase.class);
-
-    protected class StandAloneServerConfiguration extends ServerConfiguration {
-        final int port = PortManager.nextFreePort();
-        final int sslPort = PortManager.nextFreePort();
-
-        @Override
-        public boolean isStandalone() {
-            return true;
-        }
-
-        @Override
-        public int getServerPort() {
-            return port;
-        }
-
-        @Override
-        public int getSSLServerPort() {
-            return sslPort;
-        }
-    }
-
-    public ServerConfiguration getStandAloneServerConfiguration() {
-        return new StandAloneServerConfiguration();
-    }
-
-    protected PubSubServer server;
-    protected ServerConfiguration conf;
-    protected HedwigSocketAddress defaultAddress;
-
-    @Before
-    public void setUp() throws Exception {
-        logger.info("STARTING " + getClass());
-        conf = getStandAloneServerConfiguration();
-        startHubServer(conf);
-        logger.info("Standalone PubSubServer test setup finished");
-    }
-
-
-    @After
-    public void tearDown() throws Exception {
-        logger.info("tearDown starting");
-        tearDownHubServer();
-        logger.info("FINISHED " + getClass());
-    }
-
-    protected HedwigSocketAddress getDefaultHedwigAddress() {
-        return defaultAddress;
-    }
-
-    protected void startHubServer(ServerConfiguration conf) throws Exception {
-        defaultAddress = new HedwigSocketAddress("localhost", conf.getServerPort(),
-                                                 conf.getSSLServerPort());
-        server = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler());
-        server.start();
-    }
-
-    protected void tearDownHubServer() throws Exception {
-        server.shutdown();
-    }
-
-}