You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:15 UTC

[05/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
deleted file mode 100644
index 46c0c17..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.protobuf.ByteString;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestThrottlingDelivery extends HedwigHubTestBase {
-
-    private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
-    private static final String OPT_MOD = "MOD";
-
-    static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
-
-        int mod;
-
-        @Override
-        public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
-                SubscriptionPreferences preferences) {
-            Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
-            ByteString modValue = userOptions.get(OPT_MOD);
-            if (null == modValue) {
-                mod = 0;
-            } else {
-                mod = Integer.valueOf(modValue.toStringUtf8());
-            }
-            return this;
-        }
-
-        @Override
-        public boolean testMessage(Message message) {
-            int value = Integer.valueOf(message.getBody().toStringUtf8());
-            return 0 == value % mod;
-        }
-
-        @Override
-        public ServerMessageFilter initialize(Configuration conf) throws ConfigurationException, IOException {
-            // do nothing
-            return this;
-        }
-
-        @Override
-        public void uninitialize() {
-            // do nothing
-        }
-
-    }
-
-
-    protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
-
-        ThrottleDeliveryServerConfiguration(int serverPort, int sslServerPort) {
-            super(serverPort, sslServerPort);
-        }
-
-        @Override
-        public int getDefaultMessageWindowSize() {
-            return TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE;
-        }
-    }
-
-    protected class ThrottleDeliveryClientConfiguration extends HubClientConfiguration {
-
-        int messageWindowSize;
-
-        ThrottleDeliveryClientConfiguration() {
-            this(TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE);
-        }
-
-        ThrottleDeliveryClientConfiguration(int messageWindowSize) {
-            this.messageWindowSize = messageWindowSize;
-        }
-
-        @Override
-        public int getMaximumOutstandingMessages() {
-            return messageWindowSize;
-        }
-
-        void setMessageWindowSize(int messageWindowSize) {
-            this.messageWindowSize = messageWindowSize;
-        }
-
-        @Override
-        public boolean isAutoSendConsumeMessageEnabled() {
-            return false;
-        }
-
-        @Override
-        public boolean isSubscriptionChannelSharingEnabled() {
-            return isSubscriptionChannelSharingEnabled;
-        }
-    }
-
-    private void publishNums(Publisher pub, ByteString topic, int start, int num, int M) throws Exception {
-        for (int i = 1; i <= num; i++) {
-            PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder().addEntries(
-                    PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
-                            .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
-            MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
-            Message msg = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(start + i)))
-                    .setHeader(headerBuilder).build();
-            pub.publish(topic, msg);
-        }
-    }
-
-    private void throttleWithFilter(Publisher pub, final Subscriber sub,
-                           ByteString topic, ByteString subid,
-                           final int X) throws Exception {
-        // publish numbers with header (so only 3 messages would be delivered)
-        publishNums(pub, topic, 0, 3 * X, X);
-
-        // subscribe the topic with filter
-        PubSubProtocol.Map userOptions = PubSubProtocol.Map
-                .newBuilder()
-                .addEntries(
-                        PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
-                                .setValue(ByteString.copyFromUtf8(String.valueOf(X)))).build();
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH)
-                .setOptions(userOptions).setMessageFilter(ModMessageFilter.class.getName()).build();
-        sub.subscribe(topic, subid, opts);
-
-        final AtomicInteger expected = new AtomicInteger(X);
-        final CountDownLatch latch = new CountDownLatch(1);
-        sub.startDelivery(topic, subid, new MessageHandler() {
-            @Override
-            public synchronized void deliver(ByteString topic, ByteString subscriberId,
-                                             Message msg,
-                                             Callback<Void> callback, Object context) {
-                try {
-                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                    logger.debug("Received message {},", value);
-
-                    if (value == expected.get()) {
-                        expected.addAndGet(X);
-                    } else {
-                        // error condition
-                        logger.error("Did not receive expected value, expected {}, got {}",
-                                     expected.get(), value);
-                        expected.set(0);
-                        latch.countDown();
-                    }
-                    if (value == 3 * X) {
-                        latch.countDown();
-                    }
-                    callback.operationFinished(context, null);
-                    sub.consume(topic, subscriberId, msg.getMsgId());
-                } catch (Exception e) {
-                    logger.error("Received bad message", e);
-                    latch.countDown();
-                }
-            }
-        });
-
-        assertTrue("Timed out waiting for messages " + 3 * X, latch.await(10, TimeUnit.SECONDS));
-        assertEquals("Should be expected message with " + 4 * X, 4 * X, expected.get());
-
-        sub.stopDelivery(topic, subid);
-        sub.closeSubscription(topic, subid);
-    }
-
-    private void throttleX(Publisher pub, final Subscriber sub,
-                           ByteString topic, ByteString subid,
-                           final int X) throws Exception {
-        for (int i=1; i<=3*X; i++) {
-            pub.publish(topic, Message.newBuilder().setBody(
-                               ByteString.copyFromUtf8(String.valueOf(i))).build());
-        }
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.ATTACH).build();
-        sub.subscribe(topic, subid, opts);
-
-        final AtomicInteger expected = new AtomicInteger(1);
-        final CountDownLatch throttleLatch = new CountDownLatch(1);
-        final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
-        sub.startDelivery(topic, subid, new MessageHandler() {
-            @Override
-            public synchronized void deliver(ByteString topic, ByteString subscriberId,
-                                             Message msg,
-                                             Callback<Void> callback, Object context) {
-                try {
-                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                    logger.debug("Received message {},", value);
-
-                    if (value == expected.get()) {
-                        expected.incrementAndGet();
-                    } else {
-                        // error condition
-                        logger.error("Did not receive expected value, expected {}, got {}",
-                                     expected.get(), value);
-                        expected.set(0);
-                        throttleLatch.countDown();
-                        nonThrottleLatch.countDown();
-                    }
-                    if (expected.get() > X+1) {
-                        throttleLatch.countDown();
-                    }
-                    if (expected.get() == (3 * X + 1)) {
-                        nonThrottleLatch.countDown();
-                    }
-                    callback.operationFinished(context, null);
-                    if (expected.get() > X + 1) {
-                        sub.consume(topic, subscriberId, msg.getMsgId());
-                    }
-                } catch (Exception e) {
-                    logger.error("Received bad message", e);
-                    throttleLatch.countDown();
-                    nonThrottleLatch.countDown();
-                }
-            }
-        });
-        assertFalse("Received more messages than throttle value " + X,
-                    throttleLatch.await(3, TimeUnit.SECONDS));
-        assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
-
-        // consume messages to not throttle it
-        for (int i=1; i<=X; i++) {
-            sub.consume(topic, subid,
-                        MessageSeqId.newBuilder().setLocalComponent(i).build());
-        }
-
-        assertTrue("Timed out waiting for messages " + (3*X + 1),
-                   nonThrottleLatch.await(10, TimeUnit.SECONDS));
-        assertEquals("Should be expected message with " + (3*X + 1),
-                     3*X + 1, expected.get());
-
-        sub.stopDelivery(topic, subid);
-        sub.closeSubscription(topic, subid);
-    }
-
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { false }, { true } });
-    }
-
-    protected boolean isSubscriptionChannelSharingEnabled;
-
-    public TestThrottlingDelivery(boolean isSubscriptionChannelSharingEnabled) {
-        super(1);
-        this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled;
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-    }
-
-    @Override
-    protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
-        return new ThrottleDeliveryServerConfiguration(port, sslPort);
-    }
-
-    @Test(timeout=60000)
-    public void testServerSideThrottle() throws Exception {
-        int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
-        ThrottleDeliveryClientConfiguration conf =
-            new ThrottleDeliveryClientConfiguration();
-        HedwigClient client = new HedwigClient(conf);
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
-        ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-        sub.subscribe(topic, subid, opts);
-        sub.closeSubscription(topic, subid);
-
-        // throttle with hub server's setting
-        throttleX(pub, sub, topic, subid, DEFAULT_MESSAGE_WINDOW_SIZE);
-
-        messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE / 2;
-        // throttle with a lower value than hub server's setting
-        SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE)
-            .setMessageWindowSize(messageWindowSize);
-        topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
-        sub.subscribe(topic, subid, optionsBuilder.build());
-        sub.closeSubscription(topic, subid);
-        throttleX(pub, sub, topic, subid, messageWindowSize);
-
-        messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE + 5;
-        // throttle with a higher value than hub server's setting
-        optionsBuilder = SubscriptionOptions.newBuilder()
-                         .setCreateOrAttach(CreateOrAttach.CREATE)
-                         .setMessageWindowSize(messageWindowSize);
-        topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
-        sub.subscribe(topic, subid, optionsBuilder.build());
-        sub.closeSubscription(topic, subid);
-        throttleX(pub, sub, topic, subid, messageWindowSize);
-
-        client.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testThrottleWithServerSideFilter() throws Exception {
-        int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
-        ThrottleDeliveryClientConfiguration conf = new ThrottleDeliveryClientConfiguration();
-        HedwigClient client = new HedwigClient(conf);
-        Publisher pub = client.getPublisher();
-        Subscriber sub = client.getSubscriber();
-
-        ByteString topic = ByteString.copyFromUtf8("testThrottleWithServerSideFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).build();
-        sub.subscribe(topic, subid, opts);
-        sub.closeSubscription(topic, subid);
-
-        // message gap: half of the throttle threshold
-        throttleWithFilter(pub, sub, topic, subid, messageWindowSize / 2);
-        // message gap: equals to the throttle threshold
-        throttleWithFilter(pub, sub, topic, subid, messageWindowSize);
-        // message gap: larger than the throttle threshold
-        throttleWithFilter(pub, sub, topic, subid, messageWindowSize + messageWindowSize / 2);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java b/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
deleted file mode 100644
index 8e9b8f6..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.filter;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MapUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.hedwig.server.HedwigHubTestBase;
-
-public class TestMessageFilter extends HedwigHubTestBase {
-
-    // Client side variables
-    protected ClientConfiguration conf;
-    protected HedwigClient client;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-
-    static final String OPT_MOD = "MOD";
-
-    static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
-
-        int mod;
-
-        @Override
-        public ServerMessageFilter initialize(Configuration conf) {
-            // do nothing
-            return this;
-        }
-
-        @Override
-        public void uninitialize() {
-            // do nothing;
-        }
-
-        @Override
-        public MessageFilterBase setSubscriptionPreferences(ByteString topic,
-                                                            ByteString subscriberId,
-                                                            SubscriptionPreferences preferences) {
-            Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
-            ByteString modValue = userOptions.get(OPT_MOD);
-            if (null == modValue) {
-                mod = 0;
-            } else {
-                mod = Integer.valueOf(modValue.toStringUtf8());
-            }
-            return this;
-        }
-
-        @Override
-        public boolean testMessage(Message msg) {
-            int value = Integer.valueOf(msg.getBody().toStringUtf8());
-            return 0 == value % mod;
-        }
-    }
-
-    static class HeaderMessageFilter implements ServerMessageFilter, ClientMessageFilter {
-        int mod;
-        @Override
-        public ServerMessageFilter initialize(Configuration conf) {
-            // do nothing
-            return this;
-        }
-
-        @Override
-        public void uninitialize() {
-            // do nothing
-        }
-
-        @Override
-        public MessageFilterBase setSubscriptionPreferences(ByteString topic,
-                                                            ByteString subscriberId,
-                                                            SubscriptionPreferences preferences) {
-            // do nothing now
-            return this;
-        }
-
-        @Override
-        public boolean testMessage(Message msg) {
-            if (msg.hasHeader()) {
-                MessageHeader header = msg.getHeader();
-                if (header.hasProperties()) {
-                    Map<String, ByteString> props = MapUtils.buildMap(header.getProperties());
-                    ByteString value = props.get(OPT_MOD);
-                    if (null == value) {
-                        return false;
-                    }
-                    int intValue = Integer.valueOf(value.toStringUtf8());
-                    if (0 != intValue) {
-                        return false;
-                    }
-                    return true;
-                } else {
-                    return false;
-                }
-            } else {
-                return false;
-            }
-        }
-    }
-
-    public TestMessageFilter() {
-        super(1);
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-
-        conf = new HubClientConfiguration() {
-            @Override
-            public boolean isAutoSendConsumeMessageEnabled() {
-                return false;
-            }
-        };
-        client = new HedwigClient(conf);
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        client.close();
-        super.tearDown();
-    }
-
-    private void publishNums(ByteString topic, int start, int num, int M) throws Exception {
-        for (int i=1; i<=num; i++) {
-            PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder()
-                .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
-                            .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
-            MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
-            Message msg = Message.newBuilder().setBody(
-                          ByteString.copyFromUtf8(String.valueOf((start + i))))
-                          .setHeader(headerBuilder).build();
-            publisher.publish(topic, msg);
-        }
-    }
-
-    private void receiveNumModM(final ByteString topic, final ByteString subid,
-                                final String filterClassName, final ClientMessageFilter filter,
-                                final int start, final int num, final int M,
-                                final boolean consume)
-    throws Exception {
-        PubSubProtocol.Map userOptions = PubSubProtocol.Map.newBuilder()
-            .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
-                        .setValue(ByteString.copyFromUtf8(String.valueOf(M)))).build();
-        SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.ATTACH)
-            .setOptions(userOptions);
-        if (null != filterClassName) {
-            optionsBuilder.setMessageFilter(filterClassName);
-        }
-        subscriber.subscribe(topic, subid, optionsBuilder.build());
-
-        final int base = start + M - start % M;
-
-        final AtomicInteger expected = new AtomicInteger(base);
-        final CountDownLatch latch = new CountDownLatch(1);
-        MessageHandler msgHandler = new MessageHandler() {
-            synchronized public void deliver(ByteString topic, ByteString subscriberId,
-                                             Message msg, Callback<Void> callback,
-                                             Object context) {
-                try {
-                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
-                    // duplicated messages received, ignore them
-                    if (value > start) {
-                        if (value == expected.get()) {
-                            expected.addAndGet(M);
-                        } else {
-                            logger.error("Did not receive expected value, expected {}, got {}",
-                                         expected.get(), value);
-                            expected.set(0);
-                            latch.countDown();
-                        }
-                        if (expected.get() == (base + num * M)) {
-                            latch.countDown();
-                        }
-                    }
-                    callback.operationFinished(context, null);
-                    if (consume) {
-                        subscriber.consume(topic, subid, msg.getMsgId());
-                    }
-                } catch (Exception e) {
-                    logger.error("Received bad message", e);
-                    latch.countDown();
-                }
-            }
-        };
-        if (null != filter) {
-            subscriber.startDeliveryWithFilter(topic, subid, msgHandler, filter);
-        } else {
-            subscriber.startDelivery(topic, subid, msgHandler);
-        }
-        assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
-                   latch.await(10, TimeUnit.SECONDS));
-        assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
-        subscriber.stopDelivery(topic, subid);
-        subscriber.closeSubscription(topic, subid);
-    }
-
-    @Test(timeout=60000)
-    public void testServerSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-        publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true);
-    }
-
-    @Test(timeout=60000)
-    public void testInvalidServerSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestInvalidMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions options = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
-            .setMessageFilter("Invalid_Message_Filter").build();
-        try {
-            subscriber.subscribe(topic, subid, options);
-            // coun't reach here
-            fail("Should fail subscribe with invalid message filter");
-        } catch (PubSubException pse) {
-            assertTrue("Should respond with INVALID_MESSAGE_FILTER",
-                       pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testChangeSubscriptionPreferences() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferences");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-
-        publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 33, 3, true);
-
-        // change mod to receive numbers mod 5
-        publishNums(topic, 100, 100, 5);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 100, 20, 5, true);
-
-        // change mod to receive numbers mod 7
-        publishNums(topic, 200, 100, 7);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 200, 14, 7, true);
-    }
-
-    @Test(timeout=60000)
-    public void testChangeServerSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-
-        publishNums(topic, 0, 100, 3);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
-        receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
-
-        publishNums(topic, 200, 100, 7);
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 200, 14, 7, true);
-    }
-
-    @Test(timeout=60000)
-    public void testFixInvalidServerSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-
-        publishNums(topic, 0, 100, 3);
-        try {
-            receiveNumModM(topic, subid, "Invalid_Message_Filter", null, 0, 33, 3, true);
-            // coun't reach here
-            fail("Should fail subscribe with invalid message filter");
-        } catch (Exception pse) {
-            assertTrue("Should respond with INVALID_MESSAGE_FILTER",
-                       pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
-        }
-        receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
-    }
-
-    @Test(timeout=60000)
-    public void testNullClientMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        try {
-            subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter());
-            fail("Should fail start delivery with filter using null message handler.");
-        } catch (NullPointerException npe) {
-        }
-
-        try {
-            subscriber.startDeliveryWithFilter(topic, subid, new MessageHandler() {
-                public void deliver(ByteString topic, ByteString subscriberId,
-                                    Message msg, Callback<Void> callback, Object context) {
-                    // do nothing
-                }
-            }, null);
-            fail("Should fail start delivery with filter using null message filter.");
-        } catch (NullPointerException npe) {
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testClientSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-        publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true);
-    }
-
-    @Test(timeout=60000)
-    public void testChangeSubscriptionPreferencesForClientFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-
-        publishNums(topic, 0, 100, 2);
-        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
-        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 25, 4, false);
-        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 33, 3, true);
-    }
-
-    @Test(timeout=60000)
-    public void testChangeClientSideMessageFilter() throws Exception {
-        ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter");
-        ByteString subid = ByteString.copyFromUtf8("mysub");
-
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.subscribe(topic, subid, opts);
-        subscriber.closeSubscription(topic, subid);
-
-        publishNums(topic, 0, 100, 3);
-        receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
-        receiveNumModM(topic, subid, null, new HeaderMessageFilter(), 0, 33, 3, true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
deleted file mode 100644
index 4a5c63d..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.List;
-
-import org.jboss.netty.channel.Channel;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.topics.StubTopicManager;
-import org.apache.hedwig.server.topics.TopicManager;
-
-import static org.junit.Assert.*;
-
-public class TestBaseHandler {
-
-    MyBaseHandler handler;
-    StubTopicManager tm;
-    PubSubRequest request = PubSubRequest.getDefaultInstance();
-    WriteRecordingChannel channel = new WriteRecordingChannel();
-
-    protected class MyBaseHandler extends BaseHandler {
-
-        public MyBaseHandler(TopicManager tm, ServerConfiguration conf) {
-            super(tm, conf);
-        }
-
-        PubSubRequest request;
-
-        public PubSubRequest getRequest() {
-            return request;
-        }
-
-        @Override
-        public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
-            this.request = request;
-        }
-
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        tm = new StubTopicManager(conf);
-        handler = new MyBaseHandler(tm, conf);
-        request = PubSubRequest.getDefaultInstance();
-        channel = new WriteRecordingChannel();
-    }
-
-    public PubSubResponse getPubSubResponse(WriteRecordingChannel channel) {
-        List<Object> messages = channel.getMessagesWritten();
-        assertEquals(messages.size(), 1);
-
-        Object message = messages.get(0);
-        assertEquals(message.getClass(), PubSubResponse.class);
-
-        return (PubSubResponse) message;
-    }
-
-    @Test(timeout=60000)
-    public void testHandleRequestOnRedirect() throws Exception {
-        tm.setShouldOwnEveryNewTopic(false);
-        handler.handleRequest(request, channel);
-
-        PubSubResponse response = getPubSubResponse(channel);
-        assertEquals(response.getStatusCode(), StatusCode.NOT_RESPONSIBLE_FOR_TOPIC);
-        assertEquals(request.getTxnId(), response.getTxnId());
-        assertNull(handler.getRequest());
-
-    }
-
-    @Test(timeout=60000)
-    public void testHandleRequestOnOwner() throws Exception {
-
-        tm.setShouldOwnEveryNewTopic(true);
-        handler.handleRequest(request, channel);
-        assertEquals(0, channel.getMessagesWritten().size());
-        assertEquals(handler.getRequest(), request);
-
-    }
-
-    @Test(timeout=60000)
-    public void testHandleRequestOnError() throws Exception {
-
-        tm.setShouldError(true);
-        handler.handleRequest(request, channel);
-
-        PubSubResponse response = getPubSubResponse(channel);
-        assertEquals(response.getStatusCode(), StatusCode.SERVICE_DOWN);
-        assertEquals(request.getTxnId(), response.getTxnId());
-        assertNull(handler.getRequest());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
deleted file mode 100644
index 93f5c2e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.ChannelEndPoint;
-import org.apache.hedwig.server.delivery.StubDeliveryManager;
-import org.apache.hedwig.server.delivery.StubDeliveryManager.StartServingRequest;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.server.subscriptions.StubSubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-import static org.junit.Assert.*;
-
-public class TestSubUnsubHandler {
-
-    SubscribeHandler sh;
-    StubDeliveryManager dm;
-    StubSubscriptionManager sm;
-    SubscriptionChannelManager subChannelMgr;
-    ByteString topic = ByteString.copyFromUtf8("topic");
-    WriteRecordingChannel channel;
-
-    SubscribeRequest subRequestPrototype;
-    PubSubRequest pubSubRequestPrototype;
-    ByteString subscriberId;
-    UnsubscribeHandler ush;
-
-    @Before
-    public void setUp() throws Exception {
-        ServerConfiguration conf = new ServerConfiguration();
-        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-
-        TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
-        dm = new StubDeliveryManager();
-        PersistenceManager pm = LocalDBPersistenceManager.instance();
-        sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
-        subChannelMgr = new SubscriptionChannelManager();
-        sh = new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr);
-        channel = new WriteRecordingChannel();
-
-        subscriberId = ByteString.copyFromUtf8("subId");
-
-        subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
-        pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
-                                     OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
-
-        ush = new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr);
-    }
-
-    @Test(timeout=60000)
-    public void testNoSubscribeRequest() {
-        sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
-                                channel);
-        assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
-                     .getStatusCode());
-    }
-
-    @Test(timeout=60000)
-    public void testSuccessCase() {
-        StubCallback<Void> callback = new StubCallback<Void>();
-        sm.acquiredTopic(topic, callback, null);
-        assertNull(ConcurrencyUtils.take(callback.queue).right());
-
-        sh.handleRequestAtOwner(pubSubRequestPrototype, channel);
-        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
-
-        // make sure the channel was put in the maps
-        Set<TopicSubscriber> topicSubs = new HashSet<TopicSubscriber>();
-        topicSubs.add(new TopicSubscriber(topic, subscriberId));
-        assertEquals(topicSubs,
-                     subChannelMgr.channel2sub.get(channel));
-        assertEquals(channel,
-                     subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
-
-        // make sure delivery was started
-        StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
-        assertEquals(channel, ((ChannelEndPoint) startRequest.endPoint).getChannel());
-        assertEquals(PipelineFilter.class, startRequest.filter.getClass());
-        PipelineFilter pfilter = (PipelineFilter)(startRequest.filter);
-        assertEquals(1, pfilter.size());
-        assertEquals(AllToAllTopologyFilter.class, pfilter.getFirst().getClass());
-        assertEquals(1, startRequest.seqIdToStartFrom.getLocalComponent());
-        assertEquals(subscriberId, startRequest.subscriberId);
-        assertEquals(topic, startRequest.topic);
-
-        // make sure subscription was registered
-        StubCallback<SubscriptionData> callback1 = new StubCallback<SubscriptionData>();
-        sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
-                                     CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
-                                 null);
-
-        assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
-                     .right().getClass());
-
-        // trying to subscribe again should throw an error
-        WriteRecordingChannel dupChannel = new WriteRecordingChannel();
-        sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
-        assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
-
-        // after disconnecting the channel, subscribe should work again
-        subChannelMgr.channelDisconnected(channel);
-
-        dupChannel = new WriteRecordingChannel();
-        sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
-        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
-
-        // test unsubscribe
-        channel = new WriteRecordingChannel();
-        ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
-        assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
-                     .getStatusCode());
-
-        PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
-                                         UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
-        channel = new WriteRecordingChannel();
-        dm.lastRequest.clear();
-
-        ush.handleRequestAtOwner(unsubRequest, channel);
-        assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
-
-        // make sure delivery has been stopped
-        assertEquals(new TopicSubscriber(topic, subscriberId), dm.lastRequest.poll());
-
-        // make sure the info is gone from the sm
-        StubCallback<SubscriptionData> callback2 = new StubCallback<SubscriptionData>();
-        sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
-                                     CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
-                                 null);
-        assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
-                     .getClass());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
deleted file mode 100644
index 1867f9c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
+++ /dev/null
@@ -1,777 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.proxy.HedwigProxy;
-import org.apache.hedwig.server.proxy.ProxyConfiguration;
-import org.apache.hedwig.server.regions.HedwigHubClient;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.hedwig.server.LoggingExceptionHandler;
-
-import static org.junit.Assert.*;
-
-public abstract class TestHedwigHub extends HedwigHubTestBase {
-
-    // Client side variables
-    protected HedwigClient client;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-
-    // Common ByteStrings used in tests.
-    private final ByteString localSubscriberId = ByteString.copyFromUtf8("LocalSubscriber");
-    private final ByteString hubSubscriberId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX
-            + "HubSubcriber");
-
-    enum Mode {
-        REGULAR, PROXY, SSL
-    };
-
-    protected Mode mode;
-    protected boolean isSubscriptionChannelSharingEnabled;
-
-    public TestHedwigHub(Mode mode, boolean isSubscriptionChannelSharingEnabled) {
-        super(3);
-        this.mode = mode;
-        this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled;
-    }
-
-    protected HedwigProxy proxy;
-    protected ProxyConfiguration proxyConf = new ProxyConfiguration() {
-            final int proxyPort = PortManager.nextFreePort();
-
-            @Override
-            public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-                return serverAddresses.get(0);
-            }
-
-            @Override
-            public int getProxyPort() {
-                return proxyPort;
-            }
-        };
-
-    // SynchronousQueues to verify async calls
-    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-    private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
-
-    // Test implementation of Callback for async client actions.
-    static class TestCallback implements Callback<Void> {
-        private final SynchronousQueue<Boolean> queue;
-
-        public TestCallback(SynchronousQueue<Boolean> queue) {
-            this.queue = queue;
-        }
-
-        @Override
-        public void operationFinished(Object ctx, Void resultOfOperation) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Operation finished!");
-                    ConcurrencyUtils.put(queue, true);
-                }
-            }).start();
-        }
-
-        @Override
-        public void operationFailed(Object ctx, final PubSubException exception) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    logger.error("Operation failed!", exception);
-                    ConcurrencyUtils.put(queue, false);
-                }
-            }).start();
-        }
-    }
-
-    // Test implementation of subscriber's message handler.
-    static class TestMessageHandler implements MessageHandler {
-        // For subscribe reconnect testing, the server could send us back
-        // messages we've already processed and consumed. We need to keep
-        // track of the ones we've encountered so we only signal back to the
-        // consumeQueue once.
-        private HashSet<MessageSeqId> consumedMessages = new HashSet<MessageSeqId>();
-        private long largestMsgSeqIdConsumed = -1;
-        private final SynchronousQueue<Boolean> consumeQueue;
-
-        public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
-            this.consumeQueue = consumeQueue;
-        }
-
-        public void deliver(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
-                            Object context) {
-            if (!consumedMessages.contains(msg.getMsgId())) {
-                // New message to consume. Add it to the Set of consumed
-                // messages.
-                consumedMessages.add(msg.getMsgId());
-                // Check that the msg seq ID is incrementing by 1 compared to
-                // the last consumed message. Don't do this check if this is the
-                // initial message being consumed.
-                if (largestMsgSeqIdConsumed >= 0 && msg.getMsgId().getLocalComponent() != largestMsgSeqIdConsumed + 1) {
-                    new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            if (logger.isDebugEnabled())
-                                logger.debug("Consuming message that is out of order for msgId: "
-                                             + msg.getMsgId().getLocalComponent());
-                            ConcurrencyUtils.put(consumeQueue, false);
-                        }
-                    }).start();
-                } else {
-                    new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            if (logger.isDebugEnabled())
-                                logger.debug("Consume operation finished successfully!");
-                            ConcurrencyUtils.put(consumeQueue, true);
-                        }
-                    }).start();
-                }
-                // Store the consumed message as the new last msg id consumed.
-                largestMsgSeqIdConsumed = msg.getMsgId().getLocalComponent();
-            } else {
-                if (logger.isDebugEnabled())
-                    logger.debug("Consumed a message that we've processed already: " + msg);
-            }
-            callback.operationFinished(context, null);
-        }
-    }
-
-    class TestClientConfiguration extends HubClientConfiguration {
-
-        @Override
-        public InetSocketAddress getDefaultServerHost() {
-            if (mode == Mode.PROXY) {
-                return new InetSocketAddress(proxyConf.getProxyPort());
-            } else {
-                return super.getDefaultServerHost();
-            }
-        }
-
-        @Override
-        public boolean isSSLEnabled() {
-            if (mode == Mode.SSL)
-                return true;
-            else
-                return false;
-        }
-
-        @Override
-        public boolean isSubscriptionChannelSharingEnabled() {
-            return isSubscriptionChannelSharingEnabled;
-        }
-    }
-
-    // ClientConfiguration to use for this test.
-    protected ClientConfiguration getClientConfiguration() {
-        return new TestClientConfiguration();
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        if (mode == Mode.PROXY) {
-            proxy = new HedwigProxy(proxyConf, new LoggingExceptionHandler());
-            proxy.start();
-        }
-        client = new HedwigClient(getClientConfiguration());
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        client.close();
-        if (mode == Mode.PROXY) {
-            proxy.shutdown();
-        }
-        super.tearDown();
-
-    }
-
-    // Helper function to generate Messages
-    protected Message getMsg(int msgNum) {
-        return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
-    }
-
-    // Helper function to generate Topics
-    protected ByteString getTopic(int topicNum) {
-        return ByteString.copyFromUtf8("Topic" + topicNum);
-    }
-
-    protected void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler handler) throws Exception {
-        startDelivery(subscriber, topic, subscriberId, handler);
-    }
-
-    protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId,
-                                 MessageHandler handler) throws Exception {
-        subscriber.startDelivery(topic, subscriberId, handler);
-        if (mode == Mode.PROXY) {
-            WriteRecordingChannel channel = new WriteRecordingChannel();
-            PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
-                                    .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
-                                        StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
-            proxy.getStartDeliveryHandler().handleRequest(request, channel);
-            assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
-        }
-    }
-
-    protected void stopDelivery(ByteString topic, ByteString subscriberId) throws Exception {
-        stopDelivery(subscriber, topic, subscriberId);
-    }
-
-    protected void stopDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId) throws Exception {
-        subscriber.stopDelivery(topic, subscriberId);
-        if (mode == Mode.PROXY) {
-            PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
-                                    .setTopic(topic).setTxnId(1).setType(OperationType.STOP_DELIVERY).setStopDeliveryRequest(
-                                        StopDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
-            proxy.getStopDeliveryHandler().handleRequest(request, proxy.getChannelTracker().getChannel(topic, subscriberId));
-        }
-    }
-
-    protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
-        if (logger.isDebugEnabled())
-            logger.debug("Publishing " + loop + " batch of messages.");
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(getTopic(i), getMsg(i + loop * batchSize), new TestCallback(queue), null);
-            assertTrue(expected == queue.take());
-            if (messagesToBeConsumed)
-                assertTrue(consumeQueue.take());
-        }
-    }
-
-    protected void subscribeToTopics(int batchSize) throws Exception {
-        if (logger.isDebugEnabled())
-            logger.debug("Subscribing to topics and starting delivery.");
-        for (int i = 0; i < batchSize; i++) {
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscriber.asyncSubscribe(getTopic(i), localSubscriberId, opts,
-                                      new TestCallback(queue), null);
-            assertTrue(queue.take());
-        }
-
-        // Start delivery for the subscriber
-        for (int i = 0; i < batchSize; i++) {
-            startDelivery(getTopic(i), localSubscriberId, new TestMessageHandler(consumeQueue));
-        }
-    }
-
-    protected void shutDownLastServer() {
-        if (logger.isDebugEnabled())
-            logger.debug("Shutting down the last server in the Hedwig hub cluster.");
-        serversList.get(serversList.size() - 1).shutdown();
-        // Due to a possible race condition, after we've shutdown the server,
-        // the client could still be caching the channel connection to that
-        // server. It is possible for a publish request to go to the shutdown
-        // server using the closed/shutdown channel before the channel
-        // disconnect logic kicks in. What could happen is that the publish
-        // is done successfully on the channel but the server on the other end
-        // can't/won't read it. This publish request will time out and the
-        // Junit test will fail. Since that particular scenario is not what is
-        // tested here, use a workaround of sleeping in this thread (so the
-        // channel disconnect logic can complete) before we publish again.
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            logger.error("Thread was interrupted while sleeping after shutting down last server!", e);
-        }
-    }
-
-    // This tests out the manual sending of consume messages to the server
-    // instead of relying on the automatic sending by the client lib for it.
-    @Test(timeout=10000)
-    public void testManualConsumeClient() throws Exception {
-        HedwigClient myClient = new HedwigClient(new TestClientConfiguration() {
-            @Override
-            public boolean isAutoSendConsumeMessageEnabled() {
-                return false;
-            }
-
-        });
-        Subscriber mySubscriber = myClient.getSubscriber();
-        Publisher myPublisher = myClient.getPublisher();
-        ByteString myTopic = getTopic(0);
-        // Subscribe to a topic and start delivery on it
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        mySubscriber.asyncSubscribe(myTopic, localSubscriberId, opts,
-                                    new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        // Publish some messages
-        int batchSize = 10;
-        for (int i = 0; i < batchSize; i++) {
-            myPublisher.asyncPublish(myTopic, getMsg(i), new TestCallback(queue), null);
-            assertTrue(queue.take());
-            assertTrue(consumeQueue.take());
-        }
-        // Now manually send a consume message for each message received
-        for (int i = 0; i < batchSize; i++) {
-            boolean success = true;
-            try {
-                mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1)
-                                     .build());
-            } catch (ClientNotSubscribedException e) {
-                success = false;
-            }
-            assertTrue(success);
-        }
-        // Since the consume call eventually does an async write to the Netty
-        // channel, the writing of the consume requests may not have completed
-        // yet before we stop the client. Sleep a little before we stop the
-        // client just so error messages are not logged.
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            logger.error("Thread was interrupted while waiting to stop client for manual consume test!!", e);
-        }
-        myClient.close();
-    }
-
-    @Test(timeout=10000)
-    public void testAttachToSubscriptionSuccess() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts1 = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts1, new TestCallback(queue),
-                                  null);
-        assertTrue(queue.take());
-        // Close the subscription asynchronously
-        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
-        assertTrue(queue.take());
-
-        SubscriptionOptions opts2 = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.ATTACH).build();
-        // Now try to attach to the subscription
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts2, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Start delivery and publish some messages. Make sure they are consumed
-        // correctly.
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        int batchSize = 5;
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
-            assertTrue(queue.take());
-            assertTrue(consumeQueue.take());
-        }
-    }
-
-    @Test(timeout=10000)
-    public void testServerRedirect() throws Exception {
-        int batchSize = 10;
-        publishBatch(batchSize, true, false, 0);
-    }
-
-    @Test(timeout=10000)
-    public void testSubscribeAndConsume() throws Exception {
-        int batchSize = 10;
-        subscribeToTopics(batchSize);
-        publishBatch(batchSize, true, true, 0);
-    }
-
-    @Test(timeout=10000)
-    public void testServerFailoverPublishOnly() throws Exception {
-        int batchSize = 10;
-        publishBatch(batchSize, true, false, 0);
-        shutDownLastServer();
-        publishBatch(batchSize, true, false, 1);
-    }
-
-    @Test(timeout=10000)
-    public void testServerFailover() throws Exception {
-        int batchSize = 10;
-        subscribeToTopics(batchSize);
-        publishBatch(batchSize, true, true, 0);
-        shutDownLastServer();
-        publishBatch(batchSize, true, true, 1);
-    }
-
-    @Test(timeout=10000)
-    public void testUnsubscribe() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
-        assertTrue(queue.take());
-        assertTrue(consumeQueue.take());
-        // Send an Unsubscribe request
-        subscriber.asyncUnsubscribe(topic, localSubscriberId, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Now publish a message and make sure it is not consumed by the client
-        publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Wait a little bit just in case the message handler is still active,
-        // consuming the message, and then putting a true value in the
-        // consumeQueue.
-        Thread.sleep(1000);
-        // Put a False value on the consumeQueue so we can verify that it
-        // is not blocked by a message consume action which already put a True
-        // value into the queue.
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                ConcurrencyUtils.put(consumeQueue, false);
-            }
-        }).start();
-        assertFalse(consumeQueue.take());
-    }
-
-    @Test(timeout=10000)
-    public void testSyncUnsubscribeWithoutSubscription() throws Exception {
-        boolean unsubscribeSuccess = false;
-        try {
-            subscriber.unsubscribe(getTopic(0), localSubscriberId);
-        } catch (ClientNotSubscribedException e) {
-            unsubscribeSuccess = true;
-        } catch (Exception ex) {
-            unsubscribeSuccess = false;
-        }
-        assertTrue(unsubscribeSuccess);
-    }
-
-    @Test(timeout=10000)
-    public void testAsyncUnsubscribeWithoutSubscription() throws Exception {
-        subscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
-        assertFalse(queue.take());
-    }
-
-    @Test(timeout=10000)
-    public void testCloseSubscription() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
-        assertTrue(queue.take());
-        assertTrue(consumeQueue.take());
-        // Close the subscription asynchronously
-        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Now publish a message and make sure it is not consumed by the client
-        publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Wait a little bit just in case the message handler is still active,
-        // consuming the message, and then putting a true value in the
-        // consumeQueue.
-        Thread.sleep(1000);
-        // Put a False value on the consumeQueue so we can verify that it
-        // is not blocked by a message consume action which already put a True
-        // value into the queue.
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                ConcurrencyUtils.put(consumeQueue, false);
-            }
-        }).start();
-        assertFalse(consumeQueue.take());
-    }
-
-    @Test(timeout=10000)
-    public void testStartDeliveryTwice() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        try {
-            startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-            fail("Should not reach here!");
-        } catch (AlreadyStartDeliveryException e) {
-        }
-    }
-
-    @Test(timeout=10000)
-    public void testStopDelivery() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
-        assertTrue(queue.take());
-        assertTrue(consumeQueue.take());
-        // Stop the delivery for this subscription
-        stopDelivery(topic, localSubscriberId);
-        // Publish some more messages so they are queued up to be delivered to
-        // the client
-        int batchSize = 10;
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(topic, getMsg(i + 1), new TestCallback(queue), null);
-            assertTrue(queue.take());
-        }
-        // Wait a little bit just in case the message handler is still active,
-        // consuming the message, and then putting a true value in the
-        // consumeQueue.
-        Thread.sleep(1000);
-        // Put a False value on the consumeQueue so we can verify that it
-        // is not blocked by a message consume action which already put a True
-        // value into the queue.
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                ConcurrencyUtils.put(consumeQueue, false);
-            }
-        }).start();
-        assertFalse(consumeQueue.take());
-        // Now start delivery again and verify that the queued up messages are
-        // consumed
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        for (int i = 0; i < batchSize; i++) {
-            assertTrue(consumeQueue.take());
-        }
-    }
-
-    @Test(timeout=10000)
-    public void testConsumedMessagesInOrder() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        // Now publish some messages and verify that they are delivered in order
-        // to the subscriber
-        int batchSize = 100;
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
-        }
-        // We've sent out all of the publish messages asynchronously,
-        // now verify that they are consumed in the correct order.
-        for (int i = 0; i < batchSize; i++) {
-            assertTrue(queue.take());
-            assertTrue(consumeQueue.take());
-        }
-    }
-
-    @Test(timeout=10000)
-    public void testCreateSubscriptionFailure() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Close the subscription asynchronously
-        subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        // Now try to create the subscription when it already exists
-        SubscriptionOptions optsCreate = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, optsCreate, new TestCallback(queue), null);
-        assertFalse(queue.take());
-    }
-
-    @Test(timeout=10000)
-    public void testCreateSubscriptionSuccess() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE).build();
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertTrue(queue.take());
-        startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
-        int batchSize = 5;
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
-            assertTrue(queue.take());
-            assertTrue(consumeQueue.take());
-        }
-    }
-
-    @Test(timeout=10000)
-    public void testAttachToSubscriptionFailure() throws Exception {
-        ByteString topic = getTopic(0);
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.ATTACH).build();
-
-        subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
-        assertFalse(queue.take());
-    }
-
-    // The following 4 tests are to make sure that the subscriberId validation
-    // works when it is a local subscriber and we're expecting the subscriberId
-    // to be in the "local" specific format.
-    @Test(timeout=10000)
-    public void testSyncSubscribeWithInvalidSubscriberId() throws Exception {
-        boolean subscribeSuccess = false;
-        try {
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscriber.subscribe(getTopic(0), hubSubscriberId, opts);
-        } catch (InvalidSubscriberIdException e) {
-            subscribeSuccess = true;
-        } catch (Exception ex) {
-            subscribeSuccess = false;
-        }
-        assertTrue(subscribeSuccess);
-    }
-
-    @Test(timeout=10000)
-    public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, opts,
-                                  new TestCallback(queue), null);
-        assertFalse(queue.take());
-    }
-
-    @Test(timeout=10000)
-    public void testSyncUnsubscribeWithInvalidSubscriberId() throws Exception {
-        boolean unsubscribeSuccess = false;
-        try {
-            subscriber.unsubscribe(getTopic(0), hubSubscriberId);
-        } catch (InvalidSubscriberIdException e) {
-            unsubscribeSuccess = true;
-        } catch (Exception ex) {
-            unsubscribeSuccess = false;
-        }
-        assertTrue(unsubscribeSuccess);
-    }
-
-    @Test(timeout=10000)
-    public void testAsyncUnsubscribeWithInvalidSubscriberId() throws Exception {
-        subscriber.asyncUnsubscribe(getTopic(0), hubSubscriberId, new TestCallback(queue), null);
-        assertFalse(queue.take());
-    }
-
-    // The following 4 tests are to make sure that the subscriberId validation
-    // also works when it is a hub subscriber and we're expecting the
-    // subscriberId to be in the "hub" specific format.
-    @Test(timeout=10000)
-    public void testSyncHubSubscribeWithInvalidSubscriberId() throws Exception {
-        Client hubClient = new HedwigHubClient(new HubClientConfiguration());
-        Subscriber hubSubscriber = hubClient.getSubscriber();
-        boolean subscribeSuccess = false;
-        try {
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            hubSubscriber.subscribe(getTopic(0), localSubscriberId, opts);
-        } catch (InvalidSubscriberIdException e) {
-            subscribeSuccess = true;
-        } catch (Exception ex) {
-            subscribeSuccess = false;
-        }
-        assertTrue(subscribeSuccess);
-        hubClient.close();
-    }
-
-    @Test(timeout=10000)
-    public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception {
-        Client hubClient = new HedwigHubClient(new HubClientConfiguration());
-        Subscriber hubSubscriber = hubClient.getSubscriber();
-        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-        hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, opts, new TestCallback(
-                                         queue), null);
-        assertFalse(queue.take());
-        hubClient.close();
-    }
-
-    @Test(timeout=10000)
-    public void testSyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
-        Client hubClient = new HedwigHubClient(new HubClientConfiguration());
-        Subscriber hubSubscriber = hubClient.getSubscriber();
-        boolean unsubscribeSuccess = false;
-        try {
-            hubSubscriber.unsubscribe(getTopic(0), localSubscriberId);
-        } catch (InvalidSubscriberIdException e) {
-            unsubscribeSuccess = true;
-        } catch (Exception ex) {
-            unsubscribeSuccess = false;
-        }
-        assertTrue(unsubscribeSuccess);
-        hubClient.close();
-    }
-
-    @Test(timeout=10000)
-    public void testAsyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
-        Client hubClient = new HedwigHubClient(new HubClientConfiguration());
-        Subscriber hubSubscriber = hubClient.getSubscriber();
-        hubSubscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
-        assertFalse(queue.take());
-        hubClient.close();
-    }
-
-    @Test(timeout=10000)
-    public void testPublishWithBookKeeperError() throws Exception {
-        int batchSize = 10;
-        publishBatch(batchSize, true, false, 0);
-        // stop all bookie servers
-        bktb.stopAllBookieServers();
-        // following publish would failed with NotEnoughBookies
-        publishBatch(batchSize, false, false, 1);
-        // start all bookie servers
-        bktb.startAllBookieServers();
-        // following publish should succeed
-        publishBatch(batchSize, true, false, 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
deleted file mode 100644
index 64e5b4e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import java.util.Collection;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class TestHedwigHubProxy extends TestHedwigHub {
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { true }, { false } });
-    }
-
-    public TestHedwigHubProxy(boolean isSubscriptionChannelSharingEnabled) {
-        super(Mode.PROXY, isSubscriptionChannelSharingEnabled);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
deleted file mode 100644
index 2e370c0..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import java.util.Collection;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class TestHedwigHubRegular extends TestHedwigHub {
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { { true }, { false } });
-    }
-
-    public TestHedwigHubRegular(boolean isSubscriptionChannelSharingEnabled) {
-        super(Mode.REGULAR, isSubscriptionChannelSharingEnabled);
-    }
-}