You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:17 UTC
[07/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java b/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
deleted file mode 100644
index e93ecae..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.netty;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-
-import static org.junit.Assert.*;
-
-public class TestMultiplexing extends HedwigHubTestBase {
-
- private static final int DEFAULT_MSG_WINDOW_SIZE = 10;
-
- protected class TestServerConfiguration extends HubServerConfiguration {
- TestServerConfiguration(int serverPort, int sslServerPort) {
- super(serverPort, sslServerPort);
- }
- @Override
- public int getDefaultMessageWindowSize() {
- return DEFAULT_MSG_WINDOW_SIZE;
- }
- }
-
- class TestMessageHandler implements MessageHandler {
-
- int expected;
- final int numMsgsAtFirstRun;
- final int numMsgsAtSecondRun;
- final CountDownLatch firstLatch;
- final CountDownLatch secondLatch;
- final boolean receiveSecondRun;
-
- public TestMessageHandler(int start, int numMsgsAtFirstRun,
- boolean receiveSecondRun,
- int numMsgsAtSecondRun) {
- expected = start;
- this.numMsgsAtFirstRun = numMsgsAtFirstRun;
- this.numMsgsAtSecondRun = numMsgsAtSecondRun;
- this.receiveSecondRun = receiveSecondRun;
- firstLatch = new CountDownLatch(1);
- secondLatch = new CountDownLatch(1);
- }
-
- @Override
- public synchronized void deliver(ByteString topic, ByteString subscriberId,
- Message msg,
- Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- logger.debug("Received message {}.", value);
-
- if (value == expected) {
- ++expected;
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected, value);
- expected = 0;
- firstLatch.countDown();
- secondLatch.countDown();
- }
- if (numMsgsAtFirstRun + 1 == expected) {
- firstLatch.countDown();
- }
- if (receiveSecondRun) {
- if (numMsgsAtFirstRun + numMsgsAtSecondRun + 1 == expected) {
- secondLatch.countDown();
- }
- } else {
- if (numMsgsAtFirstRun + 1 < expected) {
- secondLatch.countDown();
- }
- }
- callback.operationFinished(context, null);
- subscriber.consume(topic, subscriberId, msg.getMsgId());
- } catch (Throwable t) {
- logger.error("Received bad message.", t);
- firstLatch.countDown();
- secondLatch.countDown();
- }
- }
-
- public void checkFirstRun() throws Exception {
- assertTrue("Timed out waiting for messages " + (numMsgsAtFirstRun + 1),
- firstLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with " + (numMsgsAtFirstRun + 1),
- numMsgsAtFirstRun + 1, expected);
- }
-
- public void checkSecondRun() throws Exception {
- if (receiveSecondRun) {
- assertTrue("Timed out waiting for messages "
- + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
- secondLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with "
- + (numMsgsAtFirstRun + numMsgsAtSecondRun + 1),
- numMsgsAtFirstRun + numMsgsAtSecondRun + 1, expected);
- } else {
- assertFalse("Receive more messages than " + numMsgsAtFirstRun,
- secondLatch.await(3, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with ony " + (numMsgsAtFirstRun + 1),
- numMsgsAtFirstRun + 1, expected);
- }
- }
- }
-
- class ThrottleMessageHandler implements MessageHandler {
-
- int expected;
- final int numMsgs;
- final int numMsgsThrottle;
- final CountDownLatch throttleLatch;
- final CountDownLatch nonThrottleLatch;
- final boolean enableThrottle;
-
- public ThrottleMessageHandler(int start, int numMsgs,
- boolean enableThrottle,
- int numMsgsThrottle) {
- expected = start;
- this.numMsgs = numMsgs;
- this.enableThrottle = enableThrottle;
- this.numMsgsThrottle = numMsgsThrottle;
- throttleLatch = new CountDownLatch(1);
- nonThrottleLatch = new CountDownLatch(1);
- }
-
- @Override
- public synchronized void deliver(ByteString topic, ByteString subscriberId,
- Message msg,
- Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- logger.debug("Received message {}.", value);
-
- if (value == expected) {
- ++expected;
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected, value);
- expected = 0;
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- if (expected == numMsgsThrottle + 2) {
- throttleLatch.countDown();
- }
- if (expected == numMsgs + 1) {
- nonThrottleLatch.countDown();
- }
- callback.operationFinished(context, null);
- if (enableThrottle) {
- if (expected > numMsgsThrottle + 1) {
- subscriber.consume(topic, subscriberId, msg.getMsgId());
- }
- } else {
- subscriber.consume(topic, subscriberId, msg.getMsgId());
- }
- } catch (Throwable t) {
- logger.error("Received bad message.", t);
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- }
-
- public void checkThrottle() throws Exception {
- if (enableThrottle) {
- assertFalse("Received more messages than throttle value " + numMsgsThrottle,
- throttleLatch.await(3, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with only " + (numMsgsThrottle + 1),
- numMsgsThrottle + 1, expected);
- } else {
- assertTrue("Should not be throttled.", throttleLatch.await(10, TimeUnit.SECONDS));
- assertTrue("Timed out waiting for messages " + (numMsgs + 1),
- nonThrottleLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with " + (numMsgs + 1),
- numMsgs + 1, expected);
- }
- }
-
- public void checkAfterThrottle() throws Exception {
- if (enableThrottle) {
- assertTrue("Timed out waiting for messages " + (numMsgs + 1),
- nonThrottleLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected messages with " + (numMsgs + 1),
- numMsgs + 1, expected);
- }
- }
- }
-
- HedwigClient client;
- Publisher publisher;
- Subscriber subscriber;
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- client = new HedwigClient(new HubClientConfiguration() {
- @Override
- public boolean isSubscriptionChannelSharingEnabled() {
- return true;
- }
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return false;
- }
- });
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- client.close();
- super.tearDown();
- }
-
- @Override
- protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
- return new TestServerConfiguration(port, sslPort);
- }
-
- @Test(timeout=60000)
- public void testStopDelivery() throws Exception {
- ByteString topic1 = ByteString.copyFromUtf8("testStopDelivery-1");
- ByteString topic2 = ByteString.copyFromUtf8("testStopDelivery-2");
- ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
- ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
- final int X = 20;
-
- TestMessageHandler csHandler11 =
- new TestMessageHandler(1, X, true, X);
- TestMessageHandler csHandler12 =
- new TestMessageHandler(1, X, false, 0);
- TestMessageHandler csHandler21 =
- new TestMessageHandler(1, X, false, 0);
- TestMessageHandler csHandler22 =
- new TestMessageHandler(1, X, true, X);
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
- subscriber.subscribe(topic1, subid1, opts);
- subscriber.subscribe(topic1, subid2, opts);
- subscriber.subscribe(topic2, subid1, opts);
- subscriber.subscribe(topic2, subid2, opts);
-
- // start deliveries
- subscriber.startDelivery(topic1, subid1, csHandler11);
- subscriber.startDelivery(topic1, subid2, csHandler12);
- subscriber.startDelivery(topic2, subid1, csHandler21);
- subscriber.startDelivery(topic2, subid2, csHandler22);
-
- // first publish
- for (int i = 1; i<=X; i++) {
- publisher.publish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- publisher.publish(topic2, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
-
- csHandler11.checkFirstRun();
- csHandler12.checkFirstRun();
- csHandler21.checkFirstRun();
- csHandler22.checkFirstRun();
-
- // stop delivery for <topic1, subscriber2> and <topic2, subscriber1>
- subscriber.stopDelivery(topic1, subid2);
- subscriber.stopDelivery(topic2, subid1);
-
- // second publish
- for (int i = X+1; i<=2*X; i++) {
- publisher.publish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- publisher.publish(topic2, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
-
- csHandler11.checkSecondRun();
- csHandler22.checkSecondRun();
- csHandler12.checkSecondRun();
- csHandler21.checkSecondRun();
- }
-
- @Test(timeout=60000)
- public void testCloseSubscription() throws Exception {
- ByteString topic1 = ByteString.copyFromUtf8("testCloseSubscription-1");
- ByteString topic2 = ByteString.copyFromUtf8("testCloseSubscription-2");
- ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
- ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
- final int X = 20;
-
- TestMessageHandler csHandler11 =
- new TestMessageHandler(1, X, true, X);
- TestMessageHandler csHandler12 =
- new TestMessageHandler(1, X, false, 0);
- TestMessageHandler csHandler21 =
- new TestMessageHandler(1, X, false, 0);
- TestMessageHandler csHandler22 =
- new TestMessageHandler(1, X, true, X);
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
- subscriber.subscribe(topic1, subid1, opts);
- subscriber.subscribe(topic1, subid2, opts);
- subscriber.subscribe(topic2, subid1, opts);
- subscriber.subscribe(topic2, subid2, opts);
-
- // start deliveries
- subscriber.startDelivery(topic1, subid1, csHandler11);
- subscriber.startDelivery(topic1, subid2, csHandler12);
- subscriber.startDelivery(topic2, subid1, csHandler21);
- subscriber.startDelivery(topic2, subid2, csHandler22);
-
- // first publish
- for (int i = 1; i<=X; i++) {
- publisher.publish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- publisher.publish(topic2, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
-
- csHandler11.checkFirstRun();
- csHandler12.checkFirstRun();
- csHandler21.checkFirstRun();
- csHandler22.checkFirstRun();
-
- // close subscription for <topic1, subscriber2> and <topic2, subscriber1>
- subscriber.closeSubscription(topic1, subid2);
- subscriber.closeSubscription(topic2, subid1);
-
- // second publish
- for (int i = X+1; i<=2*X; i++) {
- publisher.publish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- publisher.publish(topic2, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
-
- csHandler11.checkSecondRun();
- csHandler22.checkSecondRun();
- csHandler12.checkSecondRun();
- csHandler21.checkSecondRun();
- }
-
- @Test(timeout=60000)
- public void testThrottle() throws Exception {
- ByteString topic1 = ByteString.copyFromUtf8("testThrottle-1");
- ByteString topic2 = ByteString.copyFromUtf8("testThrottle-2");
- ByteString subid1 = ByteString.copyFromUtf8("mysubid-1");
- ByteString subid2 = ByteString.copyFromUtf8("mysubid-2");
-
- final int X = DEFAULT_MSG_WINDOW_SIZE;
-
- ThrottleMessageHandler csHandler11 =
- new ThrottleMessageHandler(1, 3*X, false, X);
- ThrottleMessageHandler csHandler12 =
- new ThrottleMessageHandler(1, 3*X, true, X);
- ThrottleMessageHandler csHandler21 =
- new ThrottleMessageHandler(1, 3*X, true, X);
- ThrottleMessageHandler csHandler22 =
- new ThrottleMessageHandler(1, 3*X, false, X);
-
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
- subscriber.subscribe(topic1, subid1, opts);
- subscriber.subscribe(topic1, subid2, opts);
- subscriber.subscribe(topic2, subid1, opts);
- subscriber.subscribe(topic2, subid2, opts);
-
- // start deliveries
- subscriber.startDelivery(topic1, subid1, csHandler11);
- subscriber.startDelivery(topic1, subid2, csHandler12);
- subscriber.startDelivery(topic2, subid1, csHandler21);
- subscriber.startDelivery(topic2, subid2, csHandler22);
-
- // publish
- for (int i = 1; i<=3*X; i++) {
- publisher.publish(topic1, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- publisher.publish(topic2, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
-
- csHandler11.checkThrottle();
- csHandler12.checkThrottle();
- csHandler21.checkThrottle();
- csHandler22.checkThrottle();
-
- // consume messages to not throttle them
- for (int i=1; i<=X; i++) {
- MessageSeqId seqId =
- MessageSeqId.newBuilder().setLocalComponent(i).build();
- subscriber.consume(topic1, subid2, seqId);
- subscriber.consume(topic2, subid1, seqId);
- }
-
- csHandler11.checkAfterThrottle();
- csHandler22.checkAfterThrottle();
- csHandler12.checkAfterThrottle();
- csHandler21.checkAfterThrottle();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
deleted file mode 100644
index 81e0314..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.persistence.BookKeeperTestBase;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.bookkeeper.test.PortManager;
-
-/**
- * This is a base class for any tests that need a Hedwig Hub(s) setup with an
- * associated BookKeeper and ZooKeeper instance.
- *
- */
-public abstract class HedwigHubTestBase extends TestCase {
-
- protected static final Logger logger = LoggerFactory.getLogger(HedwigHubTestBase.class);
-
- // BookKeeper variables
- // Default number of bookie servers to setup. Extending classes can
- // override this.
- protected int numBookies = 3;
- protected long readDelay = 0L;
- protected BookKeeperTestBase bktb;
-
- // PubSubServer variables
- // Default number of PubSubServer hubs to setup. Extending classes can
- // override this.
- protected final int numServers;
- protected List<PubSubServer> serversList;
- protected List<HedwigSocketAddress> serverAddresses;
-
- protected boolean sslEnabled = true;
- protected boolean standalone = false;
-
- protected static final String HOST = "localhost";
-
- public HedwigHubTestBase() {
- this(1);
- }
-
- protected HedwigHubTestBase(int numServers) {
- this.numServers = numServers;
-
- init();
- }
-
- public HedwigHubTestBase(String name, int numServers) {
- this.numServers = numServers;
- init();
- }
-
- private void init() {
-
- serverAddresses = new LinkedList<HedwigSocketAddress>();
- for (int i = 0; i < numServers; i++) {
- serverAddresses.add(new HedwigSocketAddress(HOST,
- PortManager.nextFreePort(), PortManager.nextFreePort()));
- }
- }
-
- // Default child class of the ServerConfiguration to be used here.
- // Extending classes can define their own (possibly extending from this) and
- // override the getServerConfiguration method below to return their own
- // configuration.
- protected class HubServerConfiguration extends ServerConfiguration {
- private final int serverPort, sslServerPort;
-
- public HubServerConfiguration(int serverPort, int sslServerPort) {
- this.serverPort = serverPort;
- this.sslServerPort = sslServerPort;
- }
-
- @Override
- public boolean isStandalone() {
- return standalone;
- }
-
- @Override
- public int getServerPort() {
- return serverPort;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslServerPort;
- }
-
- @Override
- public String getZkHost() {
- return null != bktb ? bktb.getZkHostPort() : null;
- }
-
- @Override
- public boolean isSSLEnabled() {
- return sslEnabled;
- }
-
- @Override
- public String getCertName() {
- return isSSLEnabled() ? "/server.p12" : null;
- }
-
- @Override
- public String getPassword() {
- return isSSLEnabled() ? "eUySvp2phM2Wk" : null;
- }
- }
-
- public class HubClientConfiguration extends ClientConfiguration {
- @Override
- public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
- return serverAddresses.get(0);
- }
- }
-
- // Method to get a ServerConfiguration for the PubSubServers created using
- // the specified ports. Extending child classes can override this. This
- // default implementation will return the HubServerConfiguration object
- // defined above.
- protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
- return new HubServerConfiguration(serverPort, sslServerPort);
- }
-
- protected void startHubServers() throws Exception {
- // Now create the PubSubServer Hubs
- serversList = new LinkedList<PubSubServer>();
-
- for (int i = 0; i < numServers; i++) {
- ServerConfiguration conf = getServerConfiguration(serverAddresses.get(i).getPort(),
- sslEnabled ? serverAddresses.get(i).getSSLPort() : -1);
- PubSubServer s = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler());
- serversList.add(s);
- s.start();
- }
- }
-
- protected void stopHubServers() throws Exception {
- // Shutdown all of the PubSubServers
- for (PubSubServer server : serversList) {
- server.shutdown();
- }
- serversList.clear();
- }
-
- @Before
- protected void setUp() throws Exception {
- logger.info("STARTING " + getClass());
- if (! standalone) {
- bktb = new BookKeeperTestBase(numBookies, readDelay);
- bktb.setUp();
- }
- startHubServers();
- logger.info("HedwigHub test setup finished");
- }
-
- @After
- protected void tearDown() throws Exception {
- logger.info("tearDown starting");
- stopHubServers();
- if (null != bktb) bktb.tearDown();
- logger.info("FINISHED " + getClass());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
deleted file mode 100644
index 0a574b6..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.server.persistence.BookKeeperTestBase;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.bookkeeper.test.PortManager;
-
-/**
- * This is a base class for any tests that need a Hedwig Region(s) setup with a
- * number of Hedwig hubs per region, an associated HedwigClient per region and
- * the required BookKeeper and ZooKeeper instances.
- *
- */
-public abstract class HedwigRegionTestBase {
-
- protected static final Logger logger = LoggerFactory.getLogger(HedwigRegionTestBase.class);
-
- // BookKeeper variables
- // Default number of bookie servers to setup. Extending classes
- // can override this. We should be able to reuse the same BookKeeper
- // ensemble among all of the regions, at least for unit testing purposes.
- protected int numBookies = 3;
- protected BookKeeperTestBase bktb;
-
- // Hedwig Region variables
- // Default number of Hedwig Regions to setup. Extending classes can
- // override this.
- protected int numRegions = 2;
- protected int numServersPerRegion = 1;
-
- // Map with keys being Region names and values being the list of Hedwig
- // Hubs (PubSubServers) for that particular region.
- protected Map<String, List<PubSubServer>> regionServersMap;
- // Map with keys being Region names and values being the Hedwig Client
- // instance.
- protected Map<String, HedwigClient> regionClientsMap;
-
- protected Map<String, Integer> regionNameToIndexMap;
- protected Map<Integer, List<HedwigSocketAddress>> regionHubAddresses;
-
- // String constant used as the prefix for the region names.
- protected static final String REGION_PREFIX = "region";
-
- // Default child class of the ServerConfiguration to be used here.
- // Extending classes can define their own (possibly extending from this) and
- // override the getServerConfiguration method below to return their own
- // configuration.
- protected class RegionServerConfiguration extends ServerConfiguration {
- private final int serverPort, sslServerPort;
- private final String regionName;
-
- public RegionServerConfiguration(int serverPort, int sslServerPort, String regionName) {
- this.serverPort = serverPort;
- this.sslServerPort = sslServerPort;
- this.regionName = regionName;
- conf.setProperty(REGION, regionName);
- setRegionList();
- }
-
- protected void setRegionList() {
- List<String> myRegionList = new LinkedList<String>();
- for (int i = 0; i < numRegions; i++) {
- int curDefaultServerPort = regionHubAddresses.get(i).get(0).getPort();
- int curDefaultSSLServerPort = regionHubAddresses.get(i).get(0).getSSLPort();
- // Add this region default server port if it is for a region
- // other than its own.
- if (regionNameToIndexMap.get(regionName) != i) {
- myRegionList.add("localhost:" + curDefaultServerPort + ":" + curDefaultSSLServerPort);
- }
- }
-
- regionList = myRegionList;
- }
-
- @Override
- public int getServerPort() {
- return serverPort;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslServerPort;
- }
-
- @Override
- public String getZkHost() {
- return bktb.getZkHostPort();
- }
-
- @Override
- public String getMyRegion() {
- return regionName;
- }
-
- @Override
- public boolean isSSLEnabled() {
- return true;
- }
-
- @Override
- public boolean isInterRegionSSLEnabled() {
- return true;
- }
-
- @Override
- public String getCertName() {
- return "/server.p12";
- }
-
- @Override
- public String getPassword() {
- return "eUySvp2phM2Wk";
- }
- }
-
- // Method to get a ServerConfiguration for the PubSubServers created using
- // the specified ports and region name. Extending child classes can override
- // this. This default implementation will return the
- // RegionServerConfiguration object defined above.
- protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort, String regionName) {
- return new RegionServerConfiguration(serverPort, sslServerPort, regionName);
- }
-
- // Default ClientConfiguration to use. This just points to the first
- // Hedwig hub server in each region as the "default server host" to connect
- // to.
- protected class RegionClientConfiguration extends ClientConfiguration {
- public RegionClientConfiguration(int serverPort, int sslServerPort) {
- myDefaultServerAddress = new HedwigSocketAddress("localhost:" + serverPort + ":" + sslServerPort);
- }
- // Below you can override any of the default ClientConfiguration
- // parameters if needed.
- }
-
- // Method to get a ClientConfiguration for the HedwigClients created.
- // Inputs are the default Hedwig hub server's ports to point to.
- protected ClientConfiguration getClientConfiguration(int serverPort, int sslServerPort) {
- return new RegionClientConfiguration(serverPort, sslServerPort);
- }
-
- // Method to get a ClientConfiguration for the Cross Region Hedwig Client.
- protected ClientConfiguration getRegionClientConfiguration() {
- return new ClientConfiguration() {
- @Override
- public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
- return regionHubAddresses.get(0).get(0);
- }
- };
- }
-
- @Before
- public void setUp() throws Exception {
- logger.info("STARTING " + getClass());
- bktb = new BookKeeperTestBase(numBookies);
- bktb.setUp();
-
- // Create the Hedwig PubSubServer Hubs for all of the regions
- regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f);
- regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);
-
- regionHubAddresses = new HashMap<Integer, List<HedwigSocketAddress>>(numRegions, 1.0f);
- for (int i = 0; i < numRegions; i++) {
- List<HedwigSocketAddress> addresses = new LinkedList<HedwigSocketAddress>();
- for (int j = 0; j < numServersPerRegion; j++) {
- HedwigSocketAddress a = new HedwigSocketAddress("localhost",
- PortManager.nextFreePort(), PortManager.nextFreePort());
- addresses.add(a);
- }
- regionHubAddresses.put(i, addresses);
- }
- regionNameToIndexMap = new HashMap<String, Integer>();
-
- for (int i = 0; i < numRegions; i++) {
- startRegion(i);
- }
- logger.info("HedwigRegion test setup finished");
- }
-
- @After
- public void tearDown() throws Exception {
- logger.info("tearDown starting");
- // Stop all of the HedwigClients for all regions
- for (HedwigClient client : regionClientsMap.values()) {
- client.close();
- }
- regionClientsMap.clear();
- // Shutdown all of the PubSubServers in all regions
- for (List<PubSubServer> serversList : regionServersMap.values()) {
- for (PubSubServer server : serversList) {
- server.shutdown();
- }
- }
- logger.info("Finished shutting down all of the hub servers!");
- regionServersMap.clear();
- // Shutdown the BookKeeper and ZooKeeper stuff
- bktb.tearDown();
- logger.info("FINISHED " + getClass());
- }
-
- protected void stopRegion(int regionIdx) throws Exception {
- String regionName = REGION_PREFIX + regionIdx;
- if (logger.isDebugEnabled()) {
- logger.debug("Stop region : " + regionName);
- }
- HedwigClient regionClient = regionClientsMap.remove(regionName);
- if (null != regionClient) {
- regionClient.close();
- }
- List<PubSubServer> serversList = regionServersMap.remove(regionName);
- if (null == serversList) {
- return;
- }
- for (PubSubServer server : serversList) {
- server.shutdown();
- }
- logger.info("Finished shutting down all of the hub servers in region " + regionName);
- }
-
- protected void startRegion(int i) throws Exception {
- String regionName = REGION_PREFIX + i;
- regionNameToIndexMap.put(regionName, i);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Start region : " + regionName);
- }
-
- List<PubSubServer> serversList = new LinkedList<PubSubServer>();
- // For the current region, create the necessary amount of hub
- // servers. We will basically increment through the port numbers
- // starting from the initial ones defined.
- for (int j = 0; j < numServersPerRegion; j++) {
- HedwigSocketAddress a = regionHubAddresses.get(i).get(j);
- PubSubServer s = new PubSubServer(
- getServerConfiguration(a.getPort(),
- a.getSSLPort(),
- regionName),
- getRegionClientConfiguration(),
- new LoggingExceptionHandler());
- serversList.add(s);
- s.start();
- }
- // Store this list of servers created for the current region
- regionServersMap.put(regionName, serversList);
- // Create a Hedwig Client that points to the first Hub server
- // created in the loop above for the current region.
- HedwigClient regionClient = new HedwigClient(
- getClientConfiguration(regionHubAddresses.get(i).get(0).getPort(),
- regionHubAddresses.get(i).get(0).getSSLPort()));
- regionClientsMap.put(regionName, regionClient);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java b/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
deleted file mode 100644
index 5ea0990..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/LoggingExceptionHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Exception handler that simply logs the exception and
- * does nothing more. To be used in tests instead of TerminateJVMExceptionHandler
- */
-public class LoggingExceptionHandler implements Thread.UncaughtExceptionHandler {
- private static final Logger logger = LoggerFactory.getLogger(LoggingExceptionHandler.class);
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- logger.error("Uncaught exception in thread " + t.getName(), e);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java b/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
deleted file mode 100644
index 02d1f46..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.bookkeeper.test.PortManager;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-/**
- * This is a base class for any tests that need a StandAlone PubSubServer setup.
- */
-public abstract class PubSubServerStandAloneTestBase {
-
- protected static final Logger logger = LoggerFactory.getLogger(PubSubServerStandAloneTestBase.class);
-
- protected class StandAloneServerConfiguration extends ServerConfiguration {
- final int port = PortManager.nextFreePort();
- final int sslPort = PortManager.nextFreePort();
-
- @Override
- public boolean isStandalone() {
- return true;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslPort;
- }
- }
-
- public ServerConfiguration getStandAloneServerConfiguration() {
- return new StandAloneServerConfiguration();
- }
-
- protected PubSubServer server;
- protected ServerConfiguration conf;
- protected HedwigSocketAddress defaultAddress;
-
- @Before
- public void setUp() throws Exception {
- logger.info("STARTING " + getClass());
- conf = getStandAloneServerConfiguration();
- startHubServer(conf);
- logger.info("Standalone PubSubServer test setup finished");
- }
-
-
- @After
- public void tearDown() throws Exception {
- logger.info("tearDown starting");
- tearDownHubServer();
- logger.info("FINISHED " + getClass());
- }
-
- protected HedwigSocketAddress getDefaultHedwigAddress() {
- return defaultAddress;
- }
-
- protected void startHubServer(ServerConfiguration conf) throws Exception {
- defaultAddress = new HedwigSocketAddress("localhost", conf.getServerPort(),
- conf.getSSLServerPort());
- server = new PubSubServer(conf, new ClientConfiguration(), new LoggingExceptionHandler());
- server.start();
- }
-
- protected void tearDownHubServer() throws Exception {
- server.shutdown();
- }
-
-}