You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:15 UTC
[05/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
deleted file mode 100644
index 46c0c17..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.protobuf.ByteString;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestThrottlingDelivery extends HedwigHubTestBase {
-
- private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
- private static final String OPT_MOD = "MOD";
-
- static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
-
- int mod;
-
- @Override
- public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
- Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
- ByteString modValue = userOptions.get(OPT_MOD);
- if (null == modValue) {
- mod = 0;
- } else {
- mod = Integer.valueOf(modValue.toStringUtf8());
- }
- return this;
- }
-
- @Override
- public boolean testMessage(Message message) {
- int value = Integer.valueOf(message.getBody().toStringUtf8());
- return 0 == value % mod;
- }
-
- @Override
- public ServerMessageFilter initialize(Configuration conf) throws ConfigurationException, IOException {
- // do nothing
- return this;
- }
-
- @Override
- public void uninitialize() {
- // do nothing
- }
-
- }
-
-
- protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
-
- ThrottleDeliveryServerConfiguration(int serverPort, int sslServerPort) {
- super(serverPort, sslServerPort);
- }
-
- @Override
- public int getDefaultMessageWindowSize() {
- return TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE;
- }
- }
-
- protected class ThrottleDeliveryClientConfiguration extends HubClientConfiguration {
-
- int messageWindowSize;
-
- ThrottleDeliveryClientConfiguration() {
- this(TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE);
- }
-
- ThrottleDeliveryClientConfiguration(int messageWindowSize) {
- this.messageWindowSize = messageWindowSize;
- }
-
- @Override
- public int getMaximumOutstandingMessages() {
- return messageWindowSize;
- }
-
- void setMessageWindowSize(int messageWindowSize) {
- this.messageWindowSize = messageWindowSize;
- }
-
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return false;
- }
-
- @Override
- public boolean isSubscriptionChannelSharingEnabled() {
- return isSubscriptionChannelSharingEnabled;
- }
- }
-
- private void publishNums(Publisher pub, ByteString topic, int start, int num, int M) throws Exception {
- for (int i = 1; i <= num; i++) {
- PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder().addEntries(
- PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
- .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
- MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
- Message msg = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(start + i)))
- .setHeader(headerBuilder).build();
- pub.publish(topic, msg);
- }
- }
-
- private void throttleWithFilter(Publisher pub, final Subscriber sub,
- ByteString topic, ByteString subid,
- final int X) throws Exception {
- // publish numbers with header (so only 3 messages would be delivered)
- publishNums(pub, topic, 0, 3 * X, X);
-
- // subscribe the topic with filter
- PubSubProtocol.Map userOptions = PubSubProtocol.Map
- .newBuilder()
- .addEntries(
- PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
- .setValue(ByteString.copyFromUtf8(String.valueOf(X)))).build();
- SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH)
- .setOptions(userOptions).setMessageFilter(ModMessageFilter.class.getName()).build();
- sub.subscribe(topic, subid, opts);
-
- final AtomicInteger expected = new AtomicInteger(X);
- final CountDownLatch latch = new CountDownLatch(1);
- sub.startDelivery(topic, subid, new MessageHandler() {
- @Override
- public synchronized void deliver(ByteString topic, ByteString subscriberId,
- Message msg,
- Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- logger.debug("Received message {},", value);
-
- if (value == expected.get()) {
- expected.addAndGet(X);
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (value == 3 * X) {
- latch.countDown();
- }
- callback.operationFinished(context, null);
- sub.consume(topic, subscriberId, msg.getMsgId());
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();
- }
- }
- });
-
- assertTrue("Timed out waiting for messages " + 3 * X, latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + 4 * X, 4 * X, expected.get());
-
- sub.stopDelivery(topic, subid);
- sub.closeSubscription(topic, subid);
- }
-
- private void throttleX(Publisher pub, final Subscriber sub,
- ByteString topic, ByteString subid,
- final int X) throws Exception {
- for (int i=1; i<=3*X; i++) {
- pub.publish(topic, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.ATTACH).build();
- sub.subscribe(topic, subid, opts);
-
- final AtomicInteger expected = new AtomicInteger(1);
- final CountDownLatch throttleLatch = new CountDownLatch(1);
- final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
- sub.startDelivery(topic, subid, new MessageHandler() {
- @Override
- public synchronized void deliver(ByteString topic, ByteString subscriberId,
- Message msg,
- Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- logger.debug("Received message {},", value);
-
- if (value == expected.get()) {
- expected.incrementAndGet();
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- if (expected.get() > X+1) {
- throttleLatch.countDown();
- }
- if (expected.get() == (3 * X + 1)) {
- nonThrottleLatch.countDown();
- }
- callback.operationFinished(context, null);
- if (expected.get() > X + 1) {
- sub.consume(topic, subscriberId, msg.getMsgId());
- }
- } catch (Exception e) {
- logger.error("Received bad message", e);
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- }
- });
- assertFalse("Received more messages than throttle value " + X,
- throttleLatch.await(3, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
-
- // consume messages to not throttle it
- for (int i=1; i<=X; i++) {
- sub.consume(topic, subid,
- MessageSeqId.newBuilder().setLocalComponent(i).build());
- }
-
- assertTrue("Timed out waiting for messages " + (3*X + 1),
- nonThrottleLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + (3*X + 1),
- 3*X + 1, expected.get());
-
- sub.stopDelivery(topic, subid);
- sub.closeSubscription(topic, subid);
- }
-
- @Parameters
- public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] { { false }, { true } });
- }
-
- protected boolean isSubscriptionChannelSharingEnabled;
-
- public TestThrottlingDelivery(boolean isSubscriptionChannelSharingEnabled) {
- super(1);
- this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled;
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
- @Override
- protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
- return new ThrottleDeliveryServerConfiguration(port, sslPort);
- }
-
- @Test(timeout=60000)
- public void testServerSideThrottle() throws Exception {
- int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
- ThrottleDeliveryClientConfiguration conf =
- new ThrottleDeliveryClientConfiguration();
- HedwigClient client = new HedwigClient(conf);
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
- ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
- sub.subscribe(topic, subid, opts);
- sub.closeSubscription(topic, subid);
-
- // throttle with hub server's setting
- throttleX(pub, sub, topic, subid, DEFAULT_MESSAGE_WINDOW_SIZE);
-
- messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE / 2;
- // throttle with a lower value than hub server's setting
- SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE)
- .setMessageWindowSize(messageWindowSize);
- topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
- sub.subscribe(topic, subid, optionsBuilder.build());
- sub.closeSubscription(topic, subid);
- throttleX(pub, sub, topic, subid, messageWindowSize);
-
- messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE + 5;
- // throttle with a higher value than hub server's setting
- optionsBuilder = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE)
- .setMessageWindowSize(messageWindowSize);
- topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
- sub.subscribe(topic, subid, optionsBuilder.build());
- sub.closeSubscription(topic, subid);
- throttleX(pub, sub, topic, subid, messageWindowSize);
-
- client.close();
- }
-
- @Test(timeout = 60000)
- public void testThrottleWithServerSideFilter() throws Exception {
- int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
- ThrottleDeliveryClientConfiguration conf = new ThrottleDeliveryClientConfiguration();
- HedwigClient client = new HedwigClient(conf);
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- ByteString topic = ByteString.copyFromUtf8("testThrottleWithServerSideFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).build();
- sub.subscribe(topic, subid, opts);
- sub.closeSubscription(topic, subid);
-
- // message gap: half of the throttle threshold
- throttleWithFilter(pub, sub, topic, subid, messageWindowSize / 2);
- // message gap: equals to the throttle threshold
- throttleWithFilter(pub, sub, topic, subid, messageWindowSize);
- // message gap: larger than the throttle threshold
- throttleWithFilter(pub, sub, topic, subid, messageWindowSize + messageWindowSize / 2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java b/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
deleted file mode 100644
index 8e9b8f6..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.filter;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.MapUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.filter.ClientMessageFilter;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.hedwig.server.HedwigHubTestBase;
-
-public class TestMessageFilter extends HedwigHubTestBase {
-
- // Client side variables
- protected ClientConfiguration conf;
- protected HedwigClient client;
- protected Publisher publisher;
- protected Subscriber subscriber;
-
- static final String OPT_MOD = "MOD";
-
- static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
-
- int mod;
-
- @Override
- public ServerMessageFilter initialize(Configuration conf) {
- // do nothing
- return this;
- }
-
- @Override
- public void uninitialize() {
- // do nothing;
- }
-
- @Override
- public MessageFilterBase setSubscriptionPreferences(ByteString topic,
- ByteString subscriberId,
- SubscriptionPreferences preferences) {
- Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
- ByteString modValue = userOptions.get(OPT_MOD);
- if (null == modValue) {
- mod = 0;
- } else {
- mod = Integer.valueOf(modValue.toStringUtf8());
- }
- return this;
- }
-
- @Override
- public boolean testMessage(Message msg) {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- return 0 == value % mod;
- }
- }
-
- static class HeaderMessageFilter implements ServerMessageFilter, ClientMessageFilter {
- int mod;
- @Override
- public ServerMessageFilter initialize(Configuration conf) {
- // do nothing
- return this;
- }
-
- @Override
- public void uninitialize() {
- // do nothing
- }
-
- @Override
- public MessageFilterBase setSubscriptionPreferences(ByteString topic,
- ByteString subscriberId,
- SubscriptionPreferences preferences) {
- // do nothing now
- return this;
- }
-
- @Override
- public boolean testMessage(Message msg) {
- if (msg.hasHeader()) {
- MessageHeader header = msg.getHeader();
- if (header.hasProperties()) {
- Map<String, ByteString> props = MapUtils.buildMap(header.getProperties());
- ByteString value = props.get(OPT_MOD);
- if (null == value) {
- return false;
- }
- int intValue = Integer.valueOf(value.toStringUtf8());
- if (0 != intValue) {
- return false;
- }
- return true;
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
- }
-
- public TestMessageFilter() {
- super(1);
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- conf = new HubClientConfiguration() {
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return false;
- }
- };
- client = new HedwigClient(conf);
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- client.close();
- super.tearDown();
- }
-
- private void publishNums(ByteString topic, int start, int num, int M) throws Exception {
- for (int i=1; i<=num; i++) {
- PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder()
- .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
- .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
- MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
- Message msg = Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf((start + i))))
- .setHeader(headerBuilder).build();
- publisher.publish(topic, msg);
- }
- }
-
- private void receiveNumModM(final ByteString topic, final ByteString subid,
- final String filterClassName, final ClientMessageFilter filter,
- final int start, final int num, final int M,
- final boolean consume)
- throws Exception {
- PubSubProtocol.Map userOptions = PubSubProtocol.Map.newBuilder()
- .addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
- .setValue(ByteString.copyFromUtf8(String.valueOf(M)))).build();
- SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.ATTACH)
- .setOptions(userOptions);
- if (null != filterClassName) {
- optionsBuilder.setMessageFilter(filterClassName);
- }
- subscriber.subscribe(topic, subid, optionsBuilder.build());
-
- final int base = start + M - start % M;
-
- final AtomicInteger expected = new AtomicInteger(base);
- final CountDownLatch latch = new CountDownLatch(1);
- MessageHandler msgHandler = new MessageHandler() {
- synchronized public void deliver(ByteString topic, ByteString subscriberId,
- Message msg, Callback<Void> callback,
- Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- // duplicated messages received, ignore them
- if (value > start) {
- if (value == expected.get()) {
- expected.addAndGet(M);
- } else {
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (expected.get() == (base + num * M)) {
- latch.countDown();
- }
- }
- callback.operationFinished(context, null);
- if (consume) {
- subscriber.consume(topic, subid, msg.getMsgId());
- }
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();
- }
- }
- };
- if (null != filter) {
- subscriber.startDeliveryWithFilter(topic, subid, msgHandler, filter);
- } else {
- subscriber.startDelivery(topic, subid, msgHandler);
- }
- assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
- latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
- subscriber.stopDelivery(topic, subid);
- subscriber.closeSubscription(topic, subid);
- }
-
- @Test(timeout=60000)
- public void testServerSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
- publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true);
- }
-
- @Test(timeout=60000)
- public void testInvalidServerSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestInvalidMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions options = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageFilter("Invalid_Message_Filter").build();
- try {
- subscriber.subscribe(topic, subid, options);
- // coun't reach here
- fail("Should fail subscribe with invalid message filter");
- } catch (PubSubException pse) {
- assertTrue("Should respond with INVALID_MESSAGE_FILTER",
- pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
- }
- }
-
- @Test(timeout=60000)
- public void testChangeSubscriptionPreferences() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferences");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
-
- publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 33, 3, true);
-
- // change mod to receive numbers mod 5
- publishNums(topic, 100, 100, 5);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 100, 20, 5, true);
-
- // change mod to receive numbers mod 7
- publishNums(topic, 200, 100, 7);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 200, 14, 7, true);
- }
-
- @Test(timeout=60000)
- public void testChangeServerSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
-
- publishNums(topic, 0, 100, 3);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, false);
- receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 25, 4, false);
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
-
- publishNums(topic, 200, 100, 7);
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 200, 14, 7, true);
- }
-
- @Test(timeout=60000)
- public void testFixInvalidServerSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
-
- publishNums(topic, 0, 100, 3);
- try {
- receiveNumModM(topic, subid, "Invalid_Message_Filter", null, 0, 33, 3, true);
- // coun't reach here
- fail("Should fail subscribe with invalid message filter");
- } catch (Exception pse) {
- assertTrue("Should respond with INVALID_MESSAGE_FILTER",
- pse.getMessage().contains("INVALID_MESSAGE_FILTER"));
- }
- receiveNumModM(topic, subid, HeaderMessageFilter.class.getName(), null, 0, 33, 3, true);
- }
-
- @Test(timeout=60000)
- public void testNullClientMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- try {
- subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter());
- fail("Should fail start delivery with filter using null message handler.");
- } catch (NullPointerException npe) {
- }
-
- try {
- subscriber.startDeliveryWithFilter(topic, subid, new MessageHandler() {
- public void deliver(ByteString topic, ByteString subscriberId,
- Message msg, Callback<Void> callback, Object context) {
- // do nothing
- }
- }, null);
- fail("Should fail start delivery with filter using null message filter.");
- } catch (NullPointerException npe) {
- }
- }
-
- @Test(timeout=60000)
- public void testClientSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
- publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true);
- }
-
- @Test(timeout=60000)
- public void testChangeSubscriptionPreferencesForClientFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
-
- publishNums(topic, 0, 100, 2);
- receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
- receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 25, 4, false);
- receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 33, 3, true);
- }
-
- @Test(timeout=60000)
- public void testChangeClientSideMessageFilter() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subid, opts);
- subscriber.closeSubscription(topic, subid);
-
- publishNums(topic, 0, 100, 3);
- receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, false);
- receiveNumModM(topic, subid, null, new HeaderMessageFilter(), 0, 33, 3, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
deleted file mode 100644
index 4a5c63d..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestBaseHandler.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.List;
-
-import org.jboss.netty.channel.Channel;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.topics.StubTopicManager;
-import org.apache.hedwig.server.topics.TopicManager;
-
-import static org.junit.Assert.*;
-
-public class TestBaseHandler {
-
- MyBaseHandler handler;
- StubTopicManager tm;
- PubSubRequest request = PubSubRequest.getDefaultInstance();
- WriteRecordingChannel channel = new WriteRecordingChannel();
-
- protected class MyBaseHandler extends BaseHandler {
-
- public MyBaseHandler(TopicManager tm, ServerConfiguration conf) {
- super(tm, conf);
- }
-
- PubSubRequest request;
-
- public PubSubRequest getRequest() {
- return request;
- }
-
- @Override
- public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
- this.request = request;
- }
-
- }
-
- @Before
- public void setUp() throws Exception {
- ServerConfiguration conf = new ServerConfiguration();
- tm = new StubTopicManager(conf);
- handler = new MyBaseHandler(tm, conf);
- request = PubSubRequest.getDefaultInstance();
- channel = new WriteRecordingChannel();
- }
-
- public PubSubResponse getPubSubResponse(WriteRecordingChannel channel) {
- List<Object> messages = channel.getMessagesWritten();
- assertEquals(messages.size(), 1);
-
- Object message = messages.get(0);
- assertEquals(message.getClass(), PubSubResponse.class);
-
- return (PubSubResponse) message;
- }
-
- @Test(timeout=60000)
- public void testHandleRequestOnRedirect() throws Exception {
- tm.setShouldOwnEveryNewTopic(false);
- handler.handleRequest(request, channel);
-
- PubSubResponse response = getPubSubResponse(channel);
- assertEquals(response.getStatusCode(), StatusCode.NOT_RESPONSIBLE_FOR_TOPIC);
- assertEquals(request.getTxnId(), response.getTxnId());
- assertNull(handler.getRequest());
-
- }
-
- @Test(timeout=60000)
- public void testHandleRequestOnOwner() throws Exception {
-
- tm.setShouldOwnEveryNewTopic(true);
- handler.handleRequest(request, channel);
- assertEquals(0, channel.getMessagesWritten().size());
- assertEquals(handler.getRequest(), request);
-
- }
-
- @Test(timeout=60000)
- public void testHandleRequestOnError() throws Exception {
-
- tm.setShouldError(true);
- handler.handleRequest(request, channel);
-
- PubSubResponse response = getPubSubResponse(channel);
- assertEquals(response.getStatusCode(), StatusCode.SERVICE_DOWN);
- assertEquals(request.getTxnId(), response.getTxnId());
- assertNull(handler.getRequest());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
deleted file mode 100644
index 93f5c2e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.ChannelEndPoint;
-import org.apache.hedwig.server.delivery.StubDeliveryManager;
-import org.apache.hedwig.server.delivery.StubDeliveryManager.StartServingRequest;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.server.subscriptions.StubSubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-import static org.junit.Assert.*;
-
-public class TestSubUnsubHandler {
-
- SubscribeHandler sh;
- StubDeliveryManager dm;
- StubSubscriptionManager sm;
- SubscriptionChannelManager subChannelMgr;
- ByteString topic = ByteString.copyFromUtf8("topic");
- WriteRecordingChannel channel;
-
- SubscribeRequest subRequestPrototype;
- PubSubRequest pubSubRequestPrototype;
- ByteString subscriberId;
- UnsubscribeHandler ush;
-
- @Before
- public void setUp() throws Exception {
- ServerConfiguration conf = new ServerConfiguration();
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-
- TopicManager tm = new TrivialOwnAllTopicManager(conf, executor);
- dm = new StubDeliveryManager();
- PersistenceManager pm = LocalDBPersistenceManager.instance();
- sm = new StubSubscriptionManager(tm, pm, dm, conf, executor);
- subChannelMgr = new SubscriptionChannelManager();
- sh = new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr);
- channel = new WriteRecordingChannel();
-
- subscriberId = ByteString.copyFromUtf8("subId");
-
- subRequestPrototype = SubscribeRequest.newBuilder().setSubscriberId(subscriberId).build();
- pubSubRequestPrototype = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE).setType(
- OperationType.SUBSCRIBE).setTxnId(0).setTopic(topic).setSubscribeRequest(subRequestPrototype).build();
-
- ush = new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr);
- }
-
- @Test(timeout=60000)
- public void testNoSubscribeRequest() {
- sh.handleRequestAtOwner(PubSubRequest.newBuilder(pubSubRequestPrototype).clearSubscribeRequest().build(),
- channel);
- assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
- .getStatusCode());
- }
-
- @Test(timeout=60000)
- public void testSuccessCase() {
- StubCallback<Void> callback = new StubCallback<Void>();
- sm.acquiredTopic(topic, callback, null);
- assertNull(ConcurrencyUtils.take(callback.queue).right());
-
- sh.handleRequestAtOwner(pubSubRequestPrototype, channel);
- assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
-
- // make sure the channel was put in the maps
- Set<TopicSubscriber> topicSubs = new HashSet<TopicSubscriber>();
- topicSubs.add(new TopicSubscriber(topic, subscriberId));
- assertEquals(topicSubs,
- subChannelMgr.channel2sub.get(channel));
- assertEquals(channel,
- subChannelMgr.sub2Channel.get(new TopicSubscriber(topic, subscriberId)));
-
- // make sure delivery was started
- StartServingRequest startRequest = (StartServingRequest) dm.lastRequest.poll();
- assertEquals(channel, ((ChannelEndPoint) startRequest.endPoint).getChannel());
- assertEquals(PipelineFilter.class, startRequest.filter.getClass());
- PipelineFilter pfilter = (PipelineFilter)(startRequest.filter);
- assertEquals(1, pfilter.size());
- assertEquals(AllToAllTopologyFilter.class, pfilter.getFirst().getClass());
- assertEquals(1, startRequest.seqIdToStartFrom.getLocalComponent());
- assertEquals(subscriberId, startRequest.subscriberId);
- assertEquals(topic, startRequest.topic);
-
- // make sure subscription was registered
- StubCallback<SubscriptionData> callback1 = new StubCallback<SubscriptionData>();
- sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
- CreateOrAttach.CREATE).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback1,
- null);
-
- assertEquals(PubSubException.ClientAlreadySubscribedException.class, ConcurrencyUtils.take(callback1.queue)
- .right().getClass());
-
- // trying to subscribe again should throw an error
- WriteRecordingChannel dupChannel = new WriteRecordingChannel();
- sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
- assertEquals(StatusCode.TOPIC_BUSY, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
-
- // after disconnecting the channel, subscribe should work again
- subChannelMgr.channelDisconnected(channel);
-
- dupChannel = new WriteRecordingChannel();
- sh.handleRequestAtOwner(pubSubRequestPrototype, dupChannel);
- assertEquals(StatusCode.SUCCESS, ((PubSubResponse) dupChannel.getMessagesWritten().get(0)).getStatusCode());
-
- // test unsubscribe
- channel = new WriteRecordingChannel();
- ush.handleRequestAtOwner(pubSubRequestPrototype, channel);
- assertEquals(StatusCode.MALFORMED_REQUEST, ((PubSubResponse) channel.getMessagesWritten().get(0))
- .getStatusCode());
-
- PubSubRequest unsubRequest = PubSubRequest.newBuilder(pubSubRequestPrototype).setUnsubscribeRequest(
- UnsubscribeRequest.newBuilder().setSubscriberId(subscriberId)).build();
- channel = new WriteRecordingChannel();
- dm.lastRequest.clear();
-
- ush.handleRequestAtOwner(unsubRequest, channel);
- assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
-
- // make sure delivery has been stopped
- assertEquals(new TopicSubscriber(topic, subscriberId), dm.lastRequest.poll());
-
- // make sure the info is gone from the sm
- StubCallback<SubscriptionData> callback2 = new StubCallback<SubscriptionData>();
- sm.serveSubscribeRequest(topic, SubscribeRequest.newBuilder(subRequestPrototype).setCreateOrAttach(
- CreateOrAttach.ATTACH).build(), MessageSeqId.newBuilder().setLocalComponent(10).build(), callback2,
- null);
- assertEquals(PubSubException.ClientNotSubscribedException.class, ConcurrencyUtils.take(callback2.queue).right()
- .getClass());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
deleted file mode 100644
index 1867f9c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
+++ /dev/null
@@ -1,777 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.netty.WriteRecordingChannel;
-import org.apache.hedwig.server.proxy.HedwigProxy;
-import org.apache.hedwig.server.proxy.ProxyConfiguration;
-import org.apache.hedwig.server.regions.HedwigHubClient;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.hedwig.server.LoggingExceptionHandler;
-
-import static org.junit.Assert.*;
-
-public abstract class TestHedwigHub extends HedwigHubTestBase {
-
- // Client side variables
- protected HedwigClient client;
- protected Publisher publisher;
- protected Subscriber subscriber;
-
- // Common ByteStrings used in tests.
- private final ByteString localSubscriberId = ByteString.copyFromUtf8("LocalSubscriber");
- private final ByteString hubSubscriberId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX
- + "HubSubcriber");
-
- enum Mode {
- REGULAR, PROXY, SSL
- };
-
- protected Mode mode;
- protected boolean isSubscriptionChannelSharingEnabled;
-
- public TestHedwigHub(Mode mode, boolean isSubscriptionChannelSharingEnabled) {
- super(3);
- this.mode = mode;
- this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled;
- }
-
- protected HedwigProxy proxy;
- protected ProxyConfiguration proxyConf = new ProxyConfiguration() {
- final int proxyPort = PortManager.nextFreePort();
-
- @Override
- public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
- return serverAddresses.get(0);
- }
-
- @Override
- public int getProxyPort() {
- return proxyPort;
- }
- };
-
- // SynchronousQueues to verify async calls
- private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
- private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
-
- // Test implementation of Callback for async client actions.
- static class TestCallback implements Callback<Void> {
- private final SynchronousQueue<Boolean> queue;
-
- public TestCallback(SynchronousQueue<Boolean> queue) {
- this.queue = queue;
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- if (logger.isDebugEnabled())
- logger.debug("Operation finished!");
- ConcurrencyUtils.put(queue, true);
- }
- }).start();
- }
-
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- logger.error("Operation failed!", exception);
- ConcurrencyUtils.put(queue, false);
- }
- }).start();
- }
- }
-
- // Test implementation of subscriber's message handler.
- static class TestMessageHandler implements MessageHandler {
- // For subscribe reconnect testing, the server could send us back
- // messages we've already processed and consumed. We need to keep
- // track of the ones we've encountered so we only signal back to the
- // consumeQueue once.
- private HashSet<MessageSeqId> consumedMessages = new HashSet<MessageSeqId>();
- private long largestMsgSeqIdConsumed = -1;
- private final SynchronousQueue<Boolean> consumeQueue;
-
- public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
- this.consumeQueue = consumeQueue;
- }
-
- public void deliver(ByteString topic, ByteString subscriberId, final Message msg, Callback<Void> callback,
- Object context) {
- if (!consumedMessages.contains(msg.getMsgId())) {
- // New message to consume. Add it to the Set of consumed
- // messages.
- consumedMessages.add(msg.getMsgId());
- // Check that the msg seq ID is incrementing by 1 compared to
- // the last consumed message. Don't do this check if this is the
- // initial message being consumed.
- if (largestMsgSeqIdConsumed >= 0 && msg.getMsgId().getLocalComponent() != largestMsgSeqIdConsumed + 1) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- if (logger.isDebugEnabled())
- logger.debug("Consuming message that is out of order for msgId: "
- + msg.getMsgId().getLocalComponent());
- ConcurrencyUtils.put(consumeQueue, false);
- }
- }).start();
- } else {
- new Thread(new Runnable() {
- @Override
- public void run() {
- if (logger.isDebugEnabled())
- logger.debug("Consume operation finished successfully!");
- ConcurrencyUtils.put(consumeQueue, true);
- }
- }).start();
- }
- // Store the consumed message as the new last msg id consumed.
- largestMsgSeqIdConsumed = msg.getMsgId().getLocalComponent();
- } else {
- if (logger.isDebugEnabled())
- logger.debug("Consumed a message that we've processed already: " + msg);
- }
- callback.operationFinished(context, null);
- }
- }
-
- class TestClientConfiguration extends HubClientConfiguration {
-
- @Override
- public InetSocketAddress getDefaultServerHost() {
- if (mode == Mode.PROXY) {
- return new InetSocketAddress(proxyConf.getProxyPort());
- } else {
- return super.getDefaultServerHost();
- }
- }
-
- @Override
- public boolean isSSLEnabled() {
- if (mode == Mode.SSL)
- return true;
- else
- return false;
- }
-
- @Override
- public boolean isSubscriptionChannelSharingEnabled() {
- return isSubscriptionChannelSharingEnabled;
- }
- }
-
- // ClientConfiguration to use for this test.
- protected ClientConfiguration getClientConfiguration() {
- return new TestClientConfiguration();
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- if (mode == Mode.PROXY) {
- proxy = new HedwigProxy(proxyConf, new LoggingExceptionHandler());
- proxy.start();
- }
- client = new HedwigClient(getClientConfiguration());
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- client.close();
- if (mode == Mode.PROXY) {
- proxy.shutdown();
- }
- super.tearDown();
-
- }
-
- // Helper function to generate Messages
- protected Message getMsg(int msgNum) {
- return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
- }
-
- // Helper function to generate Topics
- protected ByteString getTopic(int topicNum) {
- return ByteString.copyFromUtf8("Topic" + topicNum);
- }
-
- protected void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler handler) throws Exception {
- startDelivery(subscriber, topic, subscriberId, handler);
- }
-
- protected void startDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId,
- MessageHandler handler) throws Exception {
- subscriber.startDelivery(topic, subscriberId, handler);
- if (mode == Mode.PROXY) {
- WriteRecordingChannel channel = new WriteRecordingChannel();
- PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
- .setTopic(topic).setTxnId(0).setType(OperationType.START_DELIVERY).setStartDeliveryRequest(
- StartDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
- proxy.getStartDeliveryHandler().handleRequest(request, channel);
- assertEquals(StatusCode.SUCCESS, ((PubSubResponse) channel.getMessagesWritten().get(0)).getStatusCode());
- }
- }
-
- protected void stopDelivery(ByteString topic, ByteString subscriberId) throws Exception {
- stopDelivery(subscriber, topic, subscriberId);
- }
-
- protected void stopDelivery(Subscriber subscriber, ByteString topic, ByteString subscriberId) throws Exception {
- subscriber.stopDelivery(topic, subscriberId);
- if (mode == Mode.PROXY) {
- PubSubRequest request = PubSubRequest.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
- .setTopic(topic).setTxnId(1).setType(OperationType.STOP_DELIVERY).setStopDeliveryRequest(
- StopDeliveryRequest.newBuilder().setSubscriberId(subscriberId)).build();
- proxy.getStopDeliveryHandler().handleRequest(request, proxy.getChannelTracker().getChannel(topic, subscriberId));
- }
- }
-
- protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Publishing " + loop + " batch of messages.");
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(getTopic(i), getMsg(i + loop * batchSize), new TestCallback(queue), null);
- assertTrue(expected == queue.take());
- if (messagesToBeConsumed)
- assertTrue(consumeQueue.take());
- }
- }
-
- protected void subscribeToTopics(int batchSize) throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Subscribing to topics and starting delivery.");
- for (int i = 0; i < batchSize; i++) {
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.asyncSubscribe(getTopic(i), localSubscriberId, opts,
- new TestCallback(queue), null);
- assertTrue(queue.take());
- }
-
- // Start delivery for the subscriber
- for (int i = 0; i < batchSize; i++) {
- startDelivery(getTopic(i), localSubscriberId, new TestMessageHandler(consumeQueue));
- }
- }
-
- protected void shutDownLastServer() {
- if (logger.isDebugEnabled())
- logger.debug("Shutting down the last server in the Hedwig hub cluster.");
- serversList.get(serversList.size() - 1).shutdown();
- // Due to a possible race condition, after we've shutdown the server,
- // the client could still be caching the channel connection to that
- // server. It is possible for a publish request to go to the shutdown
- // server using the closed/shutdown channel before the channel
- // disconnect logic kicks in. What could happen is that the publish
- // is done successfully on the channel but the server on the other end
- // can't/won't read it. This publish request will time out and the
- // Junit test will fail. Since that particular scenario is not what is
- // tested here, use a workaround of sleeping in this thread (so the
- // channel disconnect logic can complete) before we publish again.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.error("Thread was interrupted while sleeping after shutting down last server!", e);
- }
- }
-
- // This tests out the manual sending of consume messages to the server
- // instead of relying on the automatic sending by the client lib for it.
- @Test(timeout=10000)
- public void testManualConsumeClient() throws Exception {
- HedwigClient myClient = new HedwigClient(new TestClientConfiguration() {
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return false;
- }
-
- });
- Subscriber mySubscriber = myClient.getSubscriber();
- Publisher myPublisher = myClient.getPublisher();
- ByteString myTopic = getTopic(0);
- // Subscribe to a topic and start delivery on it
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- mySubscriber.asyncSubscribe(myTopic, localSubscriberId, opts,
- new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
- // Publish some messages
- int batchSize = 10;
- for (int i = 0; i < batchSize; i++) {
- myPublisher.asyncPublish(myTopic, getMsg(i), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- }
- // Now manually send a consume message for each message received
- for (int i = 0; i < batchSize; i++) {
- boolean success = true;
- try {
- mySubscriber.consume(myTopic, localSubscriberId, MessageSeqId.newBuilder().setLocalComponent(i + 1)
- .build());
- } catch (ClientNotSubscribedException e) {
- success = false;
- }
- assertTrue(success);
- }
- // Since the consume call eventually does an async write to the Netty
- // channel, the writing of the consume requests may not have completed
- // yet before we stop the client. Sleep a little before we stop the
- // client just so error messages are not logged.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.error("Thread was interrupted while waiting to stop client for manual consume test!!", e);
- }
- myClient.close();
- }
-
- @Test(timeout=10000)
- public void testAttachToSubscriptionSuccess() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts1 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts1, new TestCallback(queue),
- null);
- assertTrue(queue.take());
- // Close the subscription asynchronously
- subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
- assertTrue(queue.take());
-
- SubscriptionOptions opts2 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.ATTACH).build();
- // Now try to attach to the subscription
- subscriber.asyncSubscribe(topic, localSubscriberId, opts2, new TestCallback(queue), null);
- assertTrue(queue.take());
- // Start delivery and publish some messages. Make sure they are consumed
- // correctly.
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- int batchSize = 5;
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- }
- }
-
- @Test(timeout=10000)
- public void testServerRedirect() throws Exception {
- int batchSize = 10;
- publishBatch(batchSize, true, false, 0);
- }
-
- @Test(timeout=10000)
- public void testSubscribeAndConsume() throws Exception {
- int batchSize = 10;
- subscribeToTopics(batchSize);
- publishBatch(batchSize, true, true, 0);
- }
-
- @Test(timeout=10000)
- public void testServerFailoverPublishOnly() throws Exception {
- int batchSize = 10;
- publishBatch(batchSize, true, false, 0);
- shutDownLastServer();
- publishBatch(batchSize, true, false, 1);
- }
-
- @Test(timeout=10000)
- public void testServerFailover() throws Exception {
- int batchSize = 10;
- subscribeToTopics(batchSize);
- publishBatch(batchSize, true, true, 0);
- shutDownLastServer();
- publishBatch(batchSize, true, true, 1);
- }
-
- @Test(timeout=10000)
- public void testUnsubscribe() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- // Send an Unsubscribe request
- subscriber.asyncUnsubscribe(topic, localSubscriberId, new TestCallback(queue), null);
- assertTrue(queue.take());
- // Now publish a message and make sure it is not consumed by the client
- publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
- assertTrue(queue.take());
- // Wait a little bit just in case the message handler is still active,
- // consuming the message, and then putting a true value in the
- // consumeQueue.
- Thread.sleep(1000);
- // Put a False value on the consumeQueue so we can verify that it
- // is not blocked by a message consume action which already put a True
- // value into the queue.
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(consumeQueue, false);
- }
- }).start();
- assertFalse(consumeQueue.take());
- }
-
- @Test(timeout=10000)
- public void testSyncUnsubscribeWithoutSubscription() throws Exception {
- boolean unsubscribeSuccess = false;
- try {
- subscriber.unsubscribe(getTopic(0), localSubscriberId);
- } catch (ClientNotSubscribedException e) {
- unsubscribeSuccess = true;
- } catch (Exception ex) {
- unsubscribeSuccess = false;
- }
- assertTrue(unsubscribeSuccess);
- }
-
- @Test(timeout=10000)
- public void testAsyncUnsubscribeWithoutSubscription() throws Exception {
- subscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
- assertFalse(queue.take());
- }
-
- @Test(timeout=10000)
- public void testCloseSubscription() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- // Close the subscription asynchronously
- subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
- assertTrue(queue.take());
- // Now publish a message and make sure it is not consumed by the client
- publisher.asyncPublish(topic, getMsg(1), new TestCallback(queue), null);
- assertTrue(queue.take());
- // Wait a little bit just in case the message handler is still active,
- // consuming the message, and then putting a true value in the
- // consumeQueue.
- Thread.sleep(1000);
- // Put a False value on the consumeQueue so we can verify that it
- // is not blocked by a message consume action which already put a True
- // value into the queue.
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(consumeQueue, false);
- }
- }).start();
- assertFalse(consumeQueue.take());
- }
-
- @Test(timeout=10000)
- public void testStartDeliveryTwice() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- try {
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- fail("Should not reach here!");
- } catch (AlreadyStartDeliveryException e) {
- }
- }
-
- @Test(timeout=10000)
- public void testStopDelivery() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- // Stop the delivery for this subscription
- stopDelivery(topic, localSubscriberId);
- // Publish some more messages so they are queued up to be delivered to
- // the client
- int batchSize = 10;
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(topic, getMsg(i + 1), new TestCallback(queue), null);
- assertTrue(queue.take());
- }
- // Wait a little bit just in case the message handler is still active,
- // consuming the message, and then putting a true value in the
- // consumeQueue.
- Thread.sleep(1000);
- // Put a False value on the consumeQueue so we can verify that it
- // is not blocked by a message consume action which already put a True
- // value into the queue.
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(consumeQueue, false);
- }
- }).start();
- assertFalse(consumeQueue.take());
- // Now start delivery again and verify that the queued up messages are
- // consumed
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- for (int i = 0; i < batchSize; i++) {
- assertTrue(consumeQueue.take());
- }
- }
-
- @Test(timeout=10000)
- public void testConsumedMessagesInOrder() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- // Now publish some messages and verify that they are delivered in order
- // to the subscriber
- int batchSize = 100;
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
- }
- // We've sent out all of the publish messages asynchronously,
- // now verify that they are consumed in the correct order.
- for (int i = 0; i < batchSize; i++) {
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- }
- }
-
- @Test(timeout=10000)
- public void testCreateSubscriptionFailure() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- // Close the subscription asynchronously
- subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
- assertTrue(queue.take());
- // Now try to create the subscription when it already exists
- SubscriptionOptions optsCreate = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, optsCreate, new TestCallback(queue), null);
- assertFalse(queue.take());
- }
-
- @Test(timeout=10000)
- public void testCreateSubscriptionSuccess() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertTrue(queue.take());
- startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
- int batchSize = 5;
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(topic, getMsg(i), new TestCallback(queue), null);
- assertTrue(queue.take());
- assertTrue(consumeQueue.take());
- }
- }
-
- @Test(timeout=10000)
- public void testAttachToSubscriptionFailure() throws Exception {
- ByteString topic = getTopic(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.ATTACH).build();
-
- subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
- assertFalse(queue.take());
- }
-
- // The following 4 tests are to make sure that the subscriberId validation
- // works when it is a local subscriber and we're expecting the subscriberId
- // to be in the "local" specific format.
- @Test(timeout=10000)
- public void testSyncSubscribeWithInvalidSubscriberId() throws Exception {
- boolean subscribeSuccess = false;
- try {
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(getTopic(0), hubSubscriberId, opts);
- } catch (InvalidSubscriberIdException e) {
- subscribeSuccess = true;
- } catch (Exception ex) {
- subscribeSuccess = false;
- }
- assertTrue(subscribeSuccess);
- }
-
- @Test(timeout=10000)
- public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, opts,
- new TestCallback(queue), null);
- assertFalse(queue.take());
- }
-
- @Test(timeout=10000)
- public void testSyncUnsubscribeWithInvalidSubscriberId() throws Exception {
- boolean unsubscribeSuccess = false;
- try {
- subscriber.unsubscribe(getTopic(0), hubSubscriberId);
- } catch (InvalidSubscriberIdException e) {
- unsubscribeSuccess = true;
- } catch (Exception ex) {
- unsubscribeSuccess = false;
- }
- assertTrue(unsubscribeSuccess);
- }
-
- @Test(timeout=10000)
- public void testAsyncUnsubscribeWithInvalidSubscriberId() throws Exception {
- subscriber.asyncUnsubscribe(getTopic(0), hubSubscriberId, new TestCallback(queue), null);
- assertFalse(queue.take());
- }
-
- // The following 4 tests are to make sure that the subscriberId validation
- // also works when it is a hub subscriber and we're expecting the
- // subscriberId to be in the "hub" specific format.
- @Test(timeout=10000)
- public void testSyncHubSubscribeWithInvalidSubscriberId() throws Exception {
- Client hubClient = new HedwigHubClient(new HubClientConfiguration());
- Subscriber hubSubscriber = hubClient.getSubscriber();
- boolean subscribeSuccess = false;
- try {
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- hubSubscriber.subscribe(getTopic(0), localSubscriberId, opts);
- } catch (InvalidSubscriberIdException e) {
- subscribeSuccess = true;
- } catch (Exception ex) {
- subscribeSuccess = false;
- }
- assertTrue(subscribeSuccess);
- hubClient.close();
- }
-
- @Test(timeout=10000)
- public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception {
- Client hubClient = new HedwigHubClient(new HubClientConfiguration());
- Subscriber hubSubscriber = hubClient.getSubscriber();
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, opts, new TestCallback(
- queue), null);
- assertFalse(queue.take());
- hubClient.close();
- }
-
- @Test(timeout=10000)
- public void testSyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
- Client hubClient = new HedwigHubClient(new HubClientConfiguration());
- Subscriber hubSubscriber = hubClient.getSubscriber();
- boolean unsubscribeSuccess = false;
- try {
- hubSubscriber.unsubscribe(getTopic(0), localSubscriberId);
- } catch (InvalidSubscriberIdException e) {
- unsubscribeSuccess = true;
- } catch (Exception ex) {
- unsubscribeSuccess = false;
- }
- assertTrue(unsubscribeSuccess);
- hubClient.close();
- }
-
- @Test(timeout=10000)
- public void testAsyncHubUnsubscribeWithInvalidSubscriberId() throws Exception {
- Client hubClient = new HedwigHubClient(new HubClientConfiguration());
- Subscriber hubSubscriber = hubClient.getSubscriber();
- hubSubscriber.asyncUnsubscribe(getTopic(0), localSubscriberId, new TestCallback(queue), null);
- assertFalse(queue.take());
- hubClient.close();
- }
-
- @Test(timeout=10000)
- public void testPublishWithBookKeeperError() throws Exception {
- int batchSize = 10;
- publishBatch(batchSize, true, false, 0);
- // stop all bookie servers
- bktb.stopAllBookieServers();
- // following publish would failed with NotEnoughBookies
- publishBatch(batchSize, false, false, 1);
- // start all bookie servers
- bktb.startAllBookieServers();
- // following publish should succeed
- publishBatch(batchSize, true, false, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
deleted file mode 100644
index 64e5b4e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubProxy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import java.util.Collection;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class TestHedwigHubProxy extends TestHedwigHub {
- @Parameters
- public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] { { true }, { false } });
- }
-
- public TestHedwigHubProxy(boolean isSubscriptionChannelSharingEnabled) {
- super(Mode.PROXY, isSubscriptionChannelSharingEnabled);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java b/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
deleted file mode 100644
index 2e370c0..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHubRegular.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import java.util.Collection;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class TestHedwigHubRegular extends TestHedwigHub {
- @Parameters
- public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] { { true }, { false } });
- }
-
- public TestHedwigHubRegular(boolean isSubscriptionChannelSharingEnabled) {
- super(Mode.REGULAR, isSubscriptionChannelSharingEnabled);
- }
-}