You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:16 UTC
[06/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java b/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
deleted file mode 100644
index 0150a11..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
+++ /dev/null
@@ -1,1320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.net.InetAddress;
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.protobuf.ByteString;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import org.apache.bookkeeper.test.ZooKeeperUtil;
-import org.apache.bookkeeper.test.PortManager;
-
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Backward Compatability between different versions
- */
-public class TestBackwardCompat {
-
- private static final Logger logger = LoggerFactory.getLogger(TestBackwardCompat.class);
-
- static final int CONSUMEINTERVAL = 5;
- static ZooKeeperUtil zkUtil = new ZooKeeperUtil();
-
- static class BookKeeperCluster400 {
-
- int numBookies;
- List<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration> bkConfs;
- List<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer> bks;
-
- BookKeeperCluster400(int numBookies) {
- this.numBookies = numBookies;
- }
-
- public void start() throws Exception {
- zkUtil.startServer();
-
- bks = new LinkedList<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer>();
- bkConfs = new LinkedList<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration>();
-
- for (int i=0; i<numBookies; i++) {
- startBookieServer();
- }
- }
-
- public void stop() throws Exception {
- for (org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer bs : bks) {
- bs.shutdown();
- }
- bks.clear();
-
- zkUtil.killServer();
- }
-
- protected void startBookieServer() throws Exception {
- int port = PortManager.nextFreePort();
- File tmpDir = org.apache.hw_v4_0_0.hedwig.util.FileUtils.createTempDirectory(
- getClass().getName() + port, "test");
- org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
- port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
- bks.add(startBookie(conf));
- bkConfs.add(conf);
- }
-
- protected org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration newServerConfiguration(
- int port, String zkServers, File journalDir, File[] ledgerDirs) {
- org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf =
- new org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration();
- conf.setBookiePort(port);
- conf.setZkServers(zkServers);
- conf.setJournalDirName(journalDir.getPath());
- String[] ledgerDirNames = new String[ledgerDirs.length];
- for (int i=0; i<ledgerDirs.length; i++) {
- ledgerDirNames[i] = ledgerDirs[i].getPath();
- }
- conf.setLedgerDirNames(ledgerDirNames);
- return conf;
- }
-
- protected org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer startBookie(
- org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf) throws Exception {
- org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer server
- = new org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer(conf);
- server.start();
-
- int port = conf.getBookiePort();
- while (zkUtil.getZooKeeperClient().exists(
- "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
- false) == null) {
- Thread.sleep(500);
- }
- return server;
- }
- }
-
- /**
- * Version 4.0.0 classes
- */
- static class Server400 {
- org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration conf;
- org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer server;
-
- Server400(final String zkHosts, final int port, final int sslPort) {
- conf = new org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration() {
- @Override
- public String getZkHost() {
- return zkHosts;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslPort;
- }
- };
- }
-
- void start() throws Exception {
- server = new org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer(conf);
- }
-
- void stop() throws Exception {
- if (null != server) {
- server.shutdown();
- }
- }
- }
-
- static class Client400 {
- org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration conf;
- org.apache.hw_v4_0_0.hedwig.client.api.Client client;
- org.apache.hw_v4_0_0.hedwig.client.api.Publisher publisher;
- org.apache.hw_v4_0_0.hedwig.client.api.Subscriber subscriber;
-
- Client400(final String connectString) {
- conf = new org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration() {
- @Override
- protected org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress
- getDefaultServerHedwigSocketAddress() {
- return new org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress(connectString);
- }
- };
- client = new org.apache.hw_v4_0_0.hedwig.client.HedwigClient(conf);
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- void close() throws Exception {
- if (null != client) {
- client.close();
- }
- }
-
- org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId publish(
- ByteString topic, ByteString data) throws Exception {
- org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message message =
- org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message.newBuilder()
- .setBody(data).build();
- publisher.publish(topic, message);
- return null;
- }
- }
-
- static class BookKeeperCluster410 {
-
- int numBookies;
- List<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration> bkConfs;
- List<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer> bks;
-
- BookKeeperCluster410(int numBookies) {
- this.numBookies = numBookies;
- }
-
- public void start() throws Exception {
- zkUtil.startServer();
-
- bks = new LinkedList<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer>();
- bkConfs = new LinkedList<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration>();
-
- for (int i=0; i<numBookies; i++) {
- startBookieServer();
- }
- }
-
- public void stop() throws Exception {
- for (org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer bs : bks) {
- bs.shutdown();
- }
- bks.clear();
-
- zkUtil.killServer();
- }
-
- protected void startBookieServer() throws Exception {
- int port = PortManager.nextFreePort();
- File tmpDir = org.apache.hw_v4_1_0.hedwig.util.FileUtils.createTempDirectory(
- getClass().getName() + port, "test");
- org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
- port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
- bks.add(startBookie(conf));
- bkConfs.add(conf);
- }
-
- protected org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration newServerConfiguration(
- int port, String zkServers, File journalDir, File[] ledgerDirs) {
- org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf =
- new org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration();
- conf.setBookiePort(port);
- conf.setZkServers(zkServers);
- conf.setJournalDirName(journalDir.getPath());
- String[] ledgerDirNames = new String[ledgerDirs.length];
- for (int i=0; i<ledgerDirs.length; i++) {
- ledgerDirNames[i] = ledgerDirs[i].getPath();
- }
- conf.setLedgerDirNames(ledgerDirNames);
- return conf;
- }
-
- protected org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer startBookie(
- org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf) throws Exception {
- org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer server
- = new org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer(conf);
- server.start();
-
- int port = conf.getBookiePort();
- while (zkUtil.getZooKeeperClient().exists(
- "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
- false) == null) {
- Thread.sleep(500);
- }
- return server;
- }
- }
-
- /**
- * Version 4.1.0 classes
- */
- static class Server410 {
- org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration conf;
- org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer server;
-
- Server410(final String zkHosts, final int port, final int sslPort) {
- conf = new org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration() {
- @Override
- public int getConsumeInterval() {
- return CONSUMEINTERVAL;
- }
- @Override
- public String getZkHost() {
- return zkHosts;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslPort;
- }
- };
- }
-
- void start() throws Exception {
- server = new org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer(conf);
- server.start();
- }
-
- void stop() throws Exception {
- if (null != server) {
- server.shutdown();
- }
- }
- }
-
- static class Client410 {
- org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration conf;
- org.apache.hw_v4_1_0.hedwig.client.api.Client client;
- org.apache.hw_v4_1_0.hedwig.client.api.Publisher publisher;
- org.apache.hw_v4_1_0.hedwig.client.api.Subscriber subscriber;
-
- class IntMessageHandler implements org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler {
- ByteString topic;
- ByteString subId;
- int next;
-
- CountDownLatch latch;
-
- IntMessageHandler(ByteString t, ByteString s, int start, int num) {
- this.topic = t;
- this.subId = s;
- this.next = start;
- this.latch = new CountDownLatch(num);
- }
-
- @Override
- public void deliver(ByteString t, ByteString s,
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) {
- if (!t.equals(topic) || !s.equals(subId)) {
- return;
- }
- int num = Integer.parseInt(msg.getBody().toStringUtf8());
- if (num == next) {
- latch.countDown();
- ++next;
- }
- callback.operationFinished(context, null);
- }
-
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
- return latch.await(timeout, unit);
- }
- }
-
- Client410(final String connectString) {
- conf = new org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration() {
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return true;
- }
- @Override
- public int getConsumedMessagesBufferSize() {
- return 1;
- }
- @Override
- protected org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress
- getDefaultServerHedwigSocketAddress() {
- return new org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress(connectString);
- }
- };
- client = new org.apache.hw_v4_1_0.hedwig.client.HedwigClient(conf);
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- void close() throws Exception {
- if (null != client) {
- client.close();
- }
- }
-
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.MessageSeqId publish(
- ByteString topic, ByteString data) throws Exception {
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message message =
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder()
- .setBody(data).build();
- publisher.publish(topic, message);
- return null;
- }
-
- void publishInts(ByteString topic, int start, int num) throws Exception {
- for (int i=0; i<num; i++) {
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg =
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8("" + (start+i))).build();
- publisher.publish(topic, msg);
- }
- }
-
- void sendXExpectLastY(ByteString topic, ByteString subid, final int x, final int y)
- throws Exception {
- for (int i=0; i<x; i++) {
- publisher.publish(topic, org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- subscriber.subscribe(topic, subid, org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
-
- final AtomicInteger expected = new AtomicInteger(x - y);
- final CountDownLatch latch = new CountDownLatch(1);
- subscriber.startDelivery(topic, subid, new org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler() {
- @Override
- synchronized public void deliver(ByteString topic, ByteString subscriberId,
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- if (value == expected.get()) {
- expected.incrementAndGet();
- } else {
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (expected.get() == x) {
- latch.countDown();
- }
- callback.operationFinished(context, null);
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();
- }
- }
- });
- assertTrue("Timed out waiting for messages Y is " + y + " expected is currently "
- + expected.get(), latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + x, x, expected.get());
- subscriber.stopDelivery(topic, subid);
- subscriber.closeSubscription(topic, subid);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
- }
-
- void subscribe(ByteString topic, ByteString subscriberId) throws Exception {
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build();
- subscribe(topic, subscriberId, options);
- }
-
- void subscribe(ByteString topic, ByteString subscriberId,
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options) throws Exception {
- subscriber.subscribe(topic, subscriberId, options);
- }
-
- void closeSubscription(ByteString topic, ByteString subscriberId) throws Exception {
- subscriber.closeSubscription(topic, subscriberId);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
- }
-
- void receiveInts(ByteString topic, ByteString subscriberId, int start, int num) throws Exception {
- IntMessageHandler msgHandler = new IntMessageHandler(topic, subscriberId, start, num);
- subscriber.startDelivery(topic, subscriberId, msgHandler);
- msgHandler.await(num, TimeUnit.SECONDS);
- subscriber.stopDelivery(topic, subscriberId);
- }
- }
-
- /**
- * 4.2.0 Version
- */
- static class BookKeeperCluster420{
-
- int numBookies;
- List<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration> bkConfs;
- List<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer> bks;
-
-
- BookKeeperCluster420(int numBookies) {
- this.numBookies = numBookies;
- }
-
- public void start() throws Exception {
- zkUtil.startServer();
-
- bks = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer>();
- bkConfs = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration>();
-
- for (int i=0; i<numBookies; i++) {
- startBookieServer();
- }
- }
-
- public void stop() throws Exception {
- for (org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer bs : bks) {
- bs.shutdown();
- }
- bks.clear();
-
- zkUtil.killServer();
- }
-
- protected void startBookieServer() throws Exception {
- int port = PortManager.nextFreePort();
- File tmpDir = org.apache.hw_v4_2_0.hedwig.util.FileUtils.createTempDirectory(
- getClass().getName() + port, "test");
- org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
- port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
- bks.add(startBookie(conf));
- bkConfs.add(conf);
- }
-
- protected org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration newServerConfiguration(
- int port, String zkServers, File journalDir, File[] ledgerDirs) {
- org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf =
- new org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration();
- conf.setBookiePort(port);
- conf.setZkServers(zkServers);
- conf.setJournalDirName(journalDir.getPath());
- String[] ledgerDirNames = new String[ledgerDirs.length];
- for (int i=0; i<ledgerDirs.length; i++) {
- ledgerDirNames[i] = ledgerDirs[i].getPath();
- }
- conf.setLedgerDirNames(ledgerDirNames);
- return conf;
- }
-
- protected org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer startBookie(
- org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf) throws Exception {
- org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer server
- = new org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer(conf);
- server.start();
-
- int port = conf.getBookiePort();
- while (zkUtil.getZooKeeperClient().exists(
- "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
- false) == null) {
- Thread.sleep(500);
- }
- return server;
- }
- }
-
- static class Server420 {
- org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration conf;
- org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer server;
-
- Server420(final String zkHosts, final int port, final int sslPort) {
- conf = new org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration() {
- @Override
- public int getConsumeInterval() {
- return CONSUMEINTERVAL;
- }
-
- @Override
- public String getZkHost() {
- return zkHosts;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslPort;
- }
- };
- }
-
- void start() throws Exception {
- server = new org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer(conf);
- server.start();
- }
-
- void stop() throws Exception {
- if (null != server) {
- server.shutdown();
- }
- }
- }
-
- /**
- * Current Version
- */
- static class BookKeeperClusterCurrent {
-
- int numBookies;
- List<org.apache.bookkeeper.conf.ServerConfiguration> bkConfs;
- List<org.apache.bookkeeper.proto.BookieServer> bks;
-
-
- BookKeeperClusterCurrent(int numBookies) {
- this.numBookies = numBookies;
- }
-
- public void start() throws Exception {
- zkUtil.startServer();
-
- bks = new LinkedList<org.apache.bookkeeper.proto.BookieServer>();
- bkConfs = new LinkedList<org.apache.bookkeeper.conf.ServerConfiguration>();
-
- for (int i=0; i<numBookies; i++) {
- startBookieServer();
- }
- }
-
- public void stop() throws Exception {
- for (org.apache.bookkeeper.proto.BookieServer bs : bks) {
- bs.shutdown();
- }
- bks.clear();
-
- zkUtil.killServer();
- }
-
- protected void startBookieServer() throws Exception {
- int port = PortManager.nextFreePort();
- File tmpDir = org.apache.hedwig.util.FileUtils.createTempDirectory(
- getClass().getName() + port, "test");
- org.apache.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
- port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
- conf.setAllowLoopback(true);
- bks.add(startBookie(conf));
- bkConfs.add(conf);
- }
-
- protected org.apache.bookkeeper.conf.ServerConfiguration newServerConfiguration(
- int port, String zkServers, File journalDir, File[] ledgerDirs) {
- org.apache.bookkeeper.conf.ServerConfiguration conf =
- new org.apache.bookkeeper.conf.ServerConfiguration();
- conf.setAllowLoopback(true);
- conf.setBookiePort(port);
- conf.setZkServers(zkServers);
- conf.setJournalDirName(journalDir.getPath());
- String[] ledgerDirNames = new String[ledgerDirs.length];
- for (int i=0; i<ledgerDirs.length; i++) {
- ledgerDirNames[i] = ledgerDirs[i].getPath();
- }
- conf.setLedgerDirNames(ledgerDirNames);
- return conf;
- }
-
- protected org.apache.bookkeeper.proto.BookieServer startBookie(
- org.apache.bookkeeper.conf.ServerConfiguration conf) throws Exception {
- org.apache.bookkeeper.proto.BookieServer server
- = new org.apache.bookkeeper.proto.BookieServer(conf);
- server.start();
-
- int port = conf.getBookiePort();
- while (zkUtil.getZooKeeperClient().exists(
- "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
- false) == null) {
- Thread.sleep(500);
- }
- return server;
- }
- }
-
- static class ServerCurrent {
- org.apache.hedwig.server.common.ServerConfiguration conf;
- org.apache.hedwig.server.netty.PubSubServer server;
-
- ServerCurrent(final String zkHosts, final int port, final int sslPort) {
- conf = new org.apache.hedwig.server.common.ServerConfiguration() {
- @Override
- public int getConsumeInterval() {
- return CONSUMEINTERVAL;
- }
-
- @Override
- public String getZkHost() {
- return zkHosts;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
-
- @Override
- public int getSSLServerPort() {
- return sslPort;
- }
- };
- }
-
- void start() throws Exception {
- server = new org.apache.hedwig.server.netty.PubSubServer(conf);
- server.start();
- }
-
- void stop() throws Exception {
- if (null != server) {
- server.shutdown();
- }
- }
- }
-
- static class ClientCurrent {
- org.apache.hedwig.client.conf.ClientConfiguration conf;
- org.apache.hedwig.client.api.Client client;
- org.apache.hedwig.client.api.Publisher publisher;
- org.apache.hedwig.client.api.Subscriber subscriber;
-
- class IntMessageHandler implements org.apache.hedwig.client.api.MessageHandler {
- ByteString topic;
- ByteString subId;
- int next;
-
- CountDownLatch latch;
-
- IntMessageHandler(ByteString t, ByteString s, int start, int num) {
- this.topic = t;
- this.subId = s;
- this.next = start;
- this.latch = new CountDownLatch(num);
- }
-
- @Override
- public void deliver(ByteString t, ByteString s,
- org.apache.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hedwig.util.Callback<Void> callback, Object context) {
- if (!t.equals(topic) || !s.equals(subId)) {
- return;
- }
- int num = Integer.parseInt(msg.getBody().toStringUtf8());
- if (num == next) {
- latch.countDown();
- ++next;
- }
- callback.operationFinished(context, null);
- }
-
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
- return latch.await(timeout, unit);
- }
- }
-
- ClientCurrent(final String connectString) {
- this(true, connectString);
- }
-
- ClientCurrent(final boolean autoConsumeEnabled, final String connectString) {
- conf = new org.apache.hedwig.client.conf.ClientConfiguration() {
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return autoConsumeEnabled;
- }
- @Override
- public int getConsumedMessagesBufferSize() {
- return 1;
- }
- @Override
- protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
- return new HedwigSocketAddress(connectString);
- }
- };
- client = new org.apache.hedwig.client.HedwigClient(conf);
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- void close() throws Exception {
- if (null != client) {
- client.close();
- }
- }
-
- org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId publish(
- ByteString topic, ByteString data) throws Exception {
- org.apache.hedwig.protocol.PubSubProtocol.Message message =
- org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder()
- .setBody(data).build();
- org.apache.hedwig.protocol.PubSubProtocol.PublishResponse resp =
- publisher.publish(topic, message);
- if (null == resp) {
- return null;
- }
- return resp.getPublishedMsgId();
- }
-
- void publishInts(ByteString topic, int start, int num) throws Exception {
- for (int i=0; i<num; i++) {
- org.apache.hedwig.protocol.PubSubProtocol.Message msg =
- org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8("" + (start+i))).build();
- publisher.publish(topic, msg);
- }
- }
-
- void sendXExpectLastY(ByteString topic, ByteString subid, final int x, final int y)
- throws Exception {
- for (int i=0; i<x; i++) {
- publisher.publish(topic, org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
- = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
- .build();
- subscriber.subscribe(topic, subid, opts);
-
- final AtomicInteger expected = new AtomicInteger(x - y);
- final CountDownLatch latch = new CountDownLatch(1);
- subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
- @Override
- synchronized public void deliver(ByteString topic, ByteString subscriberId,
- org.apache.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hedwig.util.Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- if (value == expected.get()) {
- expected.incrementAndGet();
- } else {
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (expected.get() == x) {
- latch.countDown();
- }
- callback.operationFinished(context, null);
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();
- }
- }
- });
- assertTrue("Timed out waiting for messages Y is " + y + " expected is currently "
- + expected.get(), latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + x, x, expected.get());
- subscriber.stopDelivery(topic, subid);
- subscriber.closeSubscription(topic, subid);
- }
-
- void receiveNumModM(final ByteString topic, final ByteString subid,
- final int start, final int num, final int M) throws Exception {
- org.apache.hedwig.filter.ServerMessageFilter filter =
- new org.apache.hedwig.filter.ServerMessageFilter() {
-
- @Override
- public org.apache.hedwig.filter.ServerMessageFilter
- initialize(Configuration conf) {
- // do nothing
- return this;
- }
-
- @Override
- public void uninitialize() {
- // do nothing;
- }
-
- @Override
- public org.apache.hedwig.filter.MessageFilterBase
- setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences) {
- // do nothing;
- return this;
- }
-
- @Override
- public boolean testMessage(org.apache.hedwig.protocol.PubSubProtocol.Message msg) {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- return 0 == value % M;
- }
- };
- filter.initialize(conf.getConf());
-
-
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
- = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
- .build();
- subscriber.subscribe(topic, subid, opts);
- final int base = start + M - start % M;
- final AtomicInteger expected = new AtomicInteger(base);
- final CountDownLatch latch = new CountDownLatch(1);
- subscriber.startDeliveryWithFilter(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
- synchronized public void deliver(ByteString topic, ByteString subscriberId,
- org.apache.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hedwig.util.Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- // duplicated messages received, ignore them
- if (value > start) {
- if (value == expected.get()) {
- expected.addAndGet(M);
- } else {
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (expected.get() == (base + num * M)) {
- latch.countDown();
- }
- }
- callback.operationFinished(context, null);
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();
- }
- }
- }, (org.apache.hedwig.filter.ClientMessageFilter) filter);
- assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(),
- latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get());
- subscriber.stopDelivery(topic, subid);
- filter.uninitialize();
- subscriber.closeSubscription(topic, subid);
- }
-
- void subscribe(ByteString topic, ByteString subscriberId) throws Exception {
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build();
- subscribe(topic, subscriberId, options);
- }
-
- void subscribe(ByteString topic, ByteString subscriberId,
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options) throws Exception {
- subscriber.subscribe(topic, subscriberId, options);
- }
-
- void closeSubscription(ByteString topic, ByteString subscriberId) throws Exception {
- subscriber.closeSubscription(topic, subscriberId);
- }
-
- void receiveInts(ByteString topic, ByteString subscriberId, int start, int num) throws Exception {
- IntMessageHandler msgHandler = new IntMessageHandler(topic, subscriberId, start, num);
- subscriber.startDelivery(topic, subscriberId, msgHandler);
- msgHandler.await(num, TimeUnit.SECONDS);
- subscriber.stopDelivery(topic, subscriberId);
- }
-
- // throttle doesn't work talking with 41 server
- void throttleX41(ByteString topic, ByteString subid, final int X)
- throws Exception {
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageWindowSize(X) .build();
- subscribe(topic, subid, options);
- closeSubscription(topic, subid);
- publishInts(topic, 1, 3*X);
- subscribe(topic, subid);
-
- final AtomicInteger expected = new AtomicInteger(1);
- final CountDownLatch throttleLatch = new CountDownLatch(1);
- final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
- subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
- @Override
- public synchronized void deliver(ByteString topic, ByteString subscriberId,
- org.apache.hedwig.protocol.PubSubProtocol.Message msg,
- org.apache.hedwig.util.Callback<Void> callback, Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
- logger.debug("Received message {},", value);
-
- if (value == expected.get()) {
- expected.incrementAndGet();
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- if (expected.get() > X+1) {
- throttleLatch.countDown();
- }
- if (expected.get() == (3 * X + 1)) {
- nonThrottleLatch.countDown();
- }
- callback.operationFinished(context, null);
- } catch (Exception e) {
- logger.error("Received bad message", e);
- throttleLatch.countDown();
- nonThrottleLatch.countDown();
- }
- }
- });
- assertTrue("Should Receive more messages than throttle value " + X,
- throttleLatch.await(10, TimeUnit.SECONDS));
-
- assertTrue("Timed out waiting for messages " + (3*X + 1),
- nonThrottleLatch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + (3*X + 1),
- 3*X + 1, expected.get());
-
- subscriber.stopDelivery(topic, subid);
- closeSubscription(topic, subid);
- }
- }
-
- /**
- * Test compatability of message bound between version 4.0.0 and
- * current version.
- *
- * 1) message bound doesn't take effects on 4.0.0 server.
- * 2) message bound take effects on both 4.1.0 and current server
- */
- @Test(timeout=60000)
- public void testMessageBoundCompat() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("testMessageBoundCompat");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start bookkeeper 400
- BookKeeperCluster400 bkc400 = new BookKeeperCluster400(3);
- bkc400.start();
-
- // start 400 server
- Server400 s400 = new Server400(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s400.start();
-
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur =
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageBound(5).build();
-
- ClientCurrent ccur = new ClientCurrent("localhost:" + port + ":" + sslPort);
- ccur.subscribe(topic, subid, options5cur);
- ccur.closeSubscription(topic, subid);
- ccur.sendXExpectLastY(topic, subid, 50, 50);
-
- // stop 400 servers
- s400.stop();
- bkc400.stop();
-
- // start bookkeeper 410
- BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
- bkc410.start();
-
- // start 410 server
- Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s410.start();
-
- ccur.subscribe(topic, subid, options5cur);
- ccur.closeSubscription(topic, subid);
- ccur.sendXExpectLastY(topic, subid, 50, 5);
-
- // stop 410 servers
- s410.stop();
- bkc410.stop();
-
- // start bookkeeper current
- BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
- bkc420.start();
-
- // start 420 server
- Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s420.start();
-
- ccur.subscribe(topic, subid, options5cur);
- ccur.closeSubscription(topic, subid);
- ccur.sendXExpectLastY(topic, subid, 50, 5);
-
- // stop 420 server
- s420.stop();
- bkc420.stop();
-
- ccur.close();
- }
-
- /**
- * Test compatability of publish interface between version 4.1.0
- * and current verison.
- *
- * 1) 4.1.0 client could talk with current server.
- * 2) current client could talk with 4.1.0 server,
- * but no message seq id would be returned
- */
- @Test(timeout=60000)
- public void testPublishCompat410() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestPublishCompat410");
- ByteString data = ByteString.copyFromUtf8("testdata");
-
- // start bookkeeper 410
- BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
- bkc410.start();
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start 410 server
- Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s410.start();
-
- ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
- Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
-
- // client c410 could publish message to 410 server
- assertNull(c410.publish(topic, data));
- // client ccur could publish message to 410 server
- // but no message seq id would be returned
- assertNull(ccur.publish(topic, data));
-
- // stop 410 server
- s410.stop();
-
- // start 420 server
- Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s420.start();
-
- // client c410 could publish message to 410 server
- // but no message seq id would be returned
- assertNull(c410.publish(topic, data));
- // client ccur could publish message to current server
- assertNotNull(ccur.publish(topic, data));
-
- ccur.close();
- c410.close();
-
- // stop 420 server
- s420.stop();
- bkc410.stop();
- }
-
- /**
- * Test compatability between version 4.1.0 and the current version.
- *
- * A current server could read subscription data recorded by 4.1.0 server.
- */
- @Test(timeout=60000)
- public void testSubscriptionDataCompat410() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestCompat410");
- ByteString sub410 = ByteString.copyFromUtf8("sub410");
- ByteString subcur = ByteString.copyFromUtf8("subcur");
-
- // start bookkeeper 410
- BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
- bkc410.start();
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start 410 server
- Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s410.start();
-
- Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
- c410.subscribe(topic, sub410);
- c410.closeSubscription(topic, sub410);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
-
- ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
- ccur.subscribe(topic, subcur);
- ccur.closeSubscription(topic, subcur);
-
- // publish messages using old client
- c410.publishInts(topic, 0, 10);
- // stop 410 server
- s410.stop();
-
- // start 420 server
- Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
- port, sslPort);
- s420.start();
-
- c410.subscribe(topic, sub410);
- c410.receiveInts(topic, sub410, 0, 10);
-
- ccur.subscribe(topic, subcur);
- ccur.receiveInts(topic, subcur, 0, 10);
-
- // publish messages using current client
- ccur.publishInts(topic, 10, 10);
-
- c410.receiveInts(topic, sub410, 10, 10);
- ccur.receiveInts(topic, subcur, 10, 10);
-
- // stop 420 server
- s420.stop();
-
- c410.close();
- ccur.close();
-
- // stop bookkeeper cluster
- bkc410.stop();
- }
-
- /**
- * Test compatability between version 4.1.0 and the current version.
- *
- * A 4.1.0 client could not update message bound, while current could do it.
- */
- @Test(timeout=60000)
- public void testUpdateMessageBoundCompat410() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- // start bookkeeper
- BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
- bkc420.start();
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start hub server
- Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
- port, sslPort);
- s420.start();
-
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur =
- org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageBound(5).build();
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5v410 =
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageBound(5).build();
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options20v410 =
- org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
- .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setMessageBound(20).build();
-
- Client410 c410 = new Client410("localhost:"+port+":"+sslPort);
- c410.subscribe(topic, subid, options20v410);
- c410.closeSubscription(topic, subid);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
-
- c410.sendXExpectLastY(topic, subid, 50, 20);
-
- c410.subscribe(topic, subid, options5v410);
- c410.closeSubscription(topic, subid);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
-
- // the message bound isn't updated.
- c410.sendXExpectLastY(topic, subid, 50, 20);
-
- ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
- ccur.subscribe(topic, subid, options5cur);
- ccur.closeSubscription(topic, subid);
- Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513)
-
- // the message bound should be updated.
- c410.sendXExpectLastY(topic, subid, 50, 5);
-
- // stop 420 server
- s420.stop();
-
- c410.close();
- ccur.close();
-
- // stop bookkeeper cluster
- bkc420.stop();
- }
-
- /**
- * Test compatability between version 4.1.0 and the current version.
- *
- * A current client running message filter would fail on 4.1.0 hub servers.
- */
- @Test(timeout=60000)
- public void testClientMessageFilterCompat410() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- // start bookkeeper
- BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
- bkc410.start();
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start hub server 410
- Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s410.start();
-
- ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort);
- ccur.subscribe(topic, subid);
- ccur.closeSubscription(topic, subid);
-
- ccur.publishInts(topic, 0, 100);
- try {
- ccur.receiveNumModM(topic, subid, 0, 50, 2);
- fail("client-side filter could not run on 4.1.0 hub server");
- } catch (Exception e) {
- logger.info("Should fail to run client-side message filter on 4.1.0 hub server.", e);
- ccur.closeSubscription(topic, subid);
- }
-
- // stop 410 server
- s410.stop();
- // stop bookkeeper cluster
- bkc410.stop();
- }
-
- /**
- * Test compatability between version 4.1.0 and the current version.
- *
- * Server side throttling does't work when current client connects to old version
- * server.
- */
- @Test(timeout=60000)
- public void testServerSideThrottleCompat410() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestServerSideThrottleCompat410");
- ByteString subid = ByteString.copyFromUtf8("mysub");
-
- // start bookkeeper
- BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
- bkc410.start();
-
- int port = PortManager.nextFreePort();
- int sslPort = PortManager.nextFreePort();
-
- // start hub server 410
- Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort);
- s410.start();
-
- ClientCurrent ccur = new ClientCurrent(false, "localhost:"+port+":"+sslPort);
- ccur.throttleX41(topic, subid, 10);
-
- ccur.close();
-
- // stop 410 server
- s410.stop();
- // stop bookkeeper cluster
- bkc410.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java b/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
deleted file mode 100644
index 632ea43..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-
-import junit.framework.Assert;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.server.LoggingExceptionHandler;
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.PubSubServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.Test;
-import org.apache.hedwig.util.FileUtils;
-
-public class TestPubSubServerStartup {
-
- private static final Logger logger = LoggerFactory.getLogger(TestPubSubServerStartup.class);
-
- /**
- * Start-up zookeeper + pubsubserver reading from a config URL. Then stop
- * and cleanup.
- *
- * Loop over that.
- *
- * If the pubsub server does not wait for its zookeeper client to be
- * connected, the pubsub server will fail at startup.
- *
- */
- @Test(timeout=60000)
- public void testPubSubServerInstantiationWithConfig() throws Exception {
- for (int i = 0; i < 10; i++) {
- logger.info("iteration " + i);
- instantiateAndDestroyPubSubServer();
- }
- }
-
- private void instantiateAndDestroyPubSubServer() throws IOException, InterruptedException, ConfigurationException,
- MalformedURLException, Exception {
- int zkPort = PortManager.nextFreePort();
- int hwPort = PortManager.nextFreePort();
- int hwSSLPort = PortManager.nextFreePort();
- String hedwigParams = "default_server_host=localhost:" + hwPort + "\n"
- + "zk_host=localhost:" + zkPort + "\n"
- + "server_port=" + hwPort + "\n"
- + "ssl_server_port=" + hwSSLPort + "\n"
- + "zk_timeout=2000\n";
-
- File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") + "/hedwig.cfg");
- writeStringToFile(hedwigParams, hedwigConfigFile);
-
- ClientBase.setupTestEnv();
- File zkTmpDir = FileUtils.createTempDirectory("zookeeper", "test");
-
- ZooKeeperServer zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, zkPort);
-
- NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory();
- serverFactory.configure(new InetSocketAddress(zkPort), 100);
- serverFactory.startup(zks);
-
- boolean b = ClientBase.waitForServerUp("127.0.0.1:" + zkPort, 5000);
- ServerConfiguration serverConf = new ServerConfiguration();
- serverConf.loadConf(hedwigConfigFile.toURI().toURL());
-
- logger.info("Zookeeper server up and running!");
-
- ZooKeeper zkc = new ZooKeeper("127.0.0.1:" + zkPort, 5000, null);
-
- // initialize the zk client with (fake) values
- zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- zkc.close();
- PubSubServer hedwigServer = null;
- try {
- logger.info("starting hedwig broker!");
- hedwigServer = new PubSubServer(serverConf, new ClientConfiguration(), new LoggingExceptionHandler());
- hedwigServer.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- Assert.assertNotNull("failed to instantiate hedwig pub sub server", hedwigServer);
-
- hedwigServer.shutdown();
- serverFactory.shutdown();
-
- zks.shutdown();
-
- zkTmpDir.delete();
-
- ClientBase.waitForServerDown("localhost:" + zkPort, 10000);
-
- }
-
- public static void writeStringToFile(String string, File f) throws IOException {
- if (f.exists()) {
- if (!f.delete()) {
- throw new RuntimeException("cannot create file " + f.getAbsolutePath());
- }
- }
- if (!f.createNewFile()) {
- throw new RuntimeException("cannot create new file " + f.getAbsolutePath());
- }
-
- FileWriter fw = new FileWriter(f);
- fw.write(string);
- fw.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
deleted file mode 100644
index 978649a..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.util.Callback;
-
-public class StubDeliveryManager implements DeliveryManager {
-
- public static class StartServingRequest {
- public ByteString topic;
- public ByteString subscriberId;
- public MessageSeqId seqIdToStartFrom;
- public DeliveryEndPoint endPoint;
- public ServerMessageFilter filter;
-
- public StartServingRequest(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences,
- MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint,
- ServerMessageFilter filter) {
- this.topic = topic;
- this.subscriberId = subscriberId;
- this.seqIdToStartFrom = seqIdToStartFrom;
- this.endPoint = endPoint;
- this.filter = filter;
- }
-
- }
-
- public Queue<Object> lastRequest = new LinkedList<Object>();
-
- @Override
- public void startServingSubscription(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences,
- MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint,
- ServerMessageFilter filter,
- Callback<Void> cb, Object ctx) {
- lastRequest.add(new StartServingRequest(topic, subscriberId, preferences,
- seqIdToStartFrom, endPoint, filter));
- cb.operationFinished(ctx, null);
- }
-
- @Override
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event,
- Callback<Void> cb, Object ctx) {
- lastRequest.add(new TopicSubscriber(topic, subscriberId));
- cb.operationFinished(ctx, null);
- }
-
- @Override
- public void messageConsumed(ByteString topic, ByteString subscriberId,
- MessageSeqId seqId) {
- // do nothing
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public void stop() {
- // do nothing now
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
deleted file mode 100644
index ebc26f1..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.StubTopicManager;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.persistence.StubPersistenceManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.util.Callback;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-public class TestFIFODeliveryManager {
- private static final Logger logger = LoggerFactory.getLogger(TestFIFODeliveryManager.class);
-
- static class TestCallback implements Callback<MessageSeqId> {
- AtomicBoolean success = new AtomicBoolean(false);
- final CountDownLatch latch;
- MessageSeqId msgid = null;
-
- TestCallback(CountDownLatch l) {
- this.latch = l;
- }
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Persist operation failed", exception);
- latch.countDown();
- }
-
- public void operationFinished(Object ctx, MessageSeqId resultOfOperation) {
- msgid = resultOfOperation;
- success.set(true);
- latch.countDown();
- }
-
- MessageSeqId getId() {
- assertTrue("Persist operation failed", success.get());
- return msgid;
- }
- }
-
- /**
- * Delivery endpoint which puts all responses on a queue
- */
- static class ExecutorDeliveryEndPointWithQueue implements DeliveryEndPoint {
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- AtomicInteger numResponses = new AtomicInteger(0);
- ConcurrentLinkedQueue<PubSubResponse> queue = new ConcurrentLinkedQueue<PubSubResponse>();
-
- public void send(final PubSubResponse response, final DeliveryCallback callback) {
- logger.info("Received response {}", response);
- queue.add(response);
- numResponses.incrementAndGet();
- executor.submit(new Runnable() {
- public void run() {
- callback.sendingFinished();
- }
- });
- }
-
- public void close() {
- executor.shutdown();
- }
-
- PubSubResponse getNextResponse() {
- return queue.poll();
- }
-
- int getNumResponses() {
- return numResponses.get();
- }
- }
-
- /**
- * Test that the FIFO delivery manager executes stopServing and startServing
- * in the correct order
- * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-539}
- */
- @Test(timeout = 60000)
- public void testFIFODeliverySubCloseSubRace() throws Exception {
- ServerConfiguration conf = new ServerConfiguration();
- ByteString topic = ByteString.copyFromUtf8("subRaceTopic");
- ByteString subscriber = ByteString.copyFromUtf8("subRaceSubscriber");
-
- PersistenceManager pm = new StubPersistenceManager();
- FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
- ExecutorDeliveryEndPointWithQueue dep = new ExecutorDeliveryEndPointWithQueue();
- SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
-
- PipelineFilter filter = new PipelineFilter();
- filter.addLast(new AllToAllTopologyFilter());
- filter.initialize(conf.getConf());
- filter.setSubscriptionPreferences(topic, subscriber, prefs);
- MessageSeqId startId = MessageSeqId.newBuilder().setLocalComponent(1).build();
-
- CountDownLatch l = new CountDownLatch(1);
- Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(1))).build();
- TestCallback cb = new TestCallback(l);
- pm.persistMessage(new PersistRequest(topic, m, cb, null));
- assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS));
-
- final CountDownLatch oplatch = new CountDownLatch(3);
- fdm.start();
- fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, filter,
- new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- oplatch.countDown();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- oplatch.countDown();
- }
- }, null);
- fdm.stopServingSubscriber(topic, subscriber, null,
- new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- oplatch.countDown();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- oplatch.countDown();
- }
- }, null);
- fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, filter,
- new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- oplatch.countDown();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- oplatch.countDown();
- }
- }, null);
-
- assertTrue("Ops never finished", oplatch.await(10, TimeUnit.SECONDS));
- int seconds = 5;
- while (dep.getNumResponses() < 2) {
- if (seconds-- == 0) {
- break;
- }
- Thread.sleep(1000);
- }
- PubSubResponse r = dep.getNextResponse();
- assertNotNull("There should be a response", r);
- assertTrue("Response should contain a message", r.hasMessage());
- r = dep.getNextResponse();
- assertNotNull("There should be a response", r);
- assertTrue("Response should contain a message", r.hasMessage());
- r = dep.getNextResponse();
- assertNull("There should only be 2 responses", r);
- }
-
- static class ExecutorDeliveryEndPoint implements DeliveryEndPoint {
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- AtomicInteger numDelivered = new AtomicInteger();
- final DeliveryManager dm;
-
- ExecutorDeliveryEndPoint(DeliveryManager dm) {
- this.dm = dm;
- }
-
- public void send(final PubSubResponse response, final DeliveryCallback callback) {
- executor.submit(new Runnable() {
- public void run() {
- if (response.hasMessage()) {
- MessageSeqId msgid = response.getMessage().getMsgId();
- if ((msgid.getLocalComponent() % 2) == 1) {
- dm.messageConsumed(response.getTopic(),
- response.getSubscriberId(),
- response.getMessage().getMsgId());
- } else {
- executor.schedule(new Runnable() {
- public void run() {
- dm.messageConsumed(response.getTopic(),
- response.getSubscriberId(),
- response.getMessage().getMsgId());
- }
- }, 1, TimeUnit.SECONDS);
- }
- }
- numDelivered.incrementAndGet();
- callback.sendingFinished();
- }
- });
- }
-
- public void close() {
- executor.shutdown();
- }
-
- int getNumDelivered() {
- return numDelivered.get();
- }
- }
-
- /**
- * Test throttle race issue cause by messageConsumed and doDeliverNextMessage
- * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
- */
- @Test(timeout = 60000)
- public void testFIFODeliveryThrottlingRace() throws Exception {
- final int numMessages = 20;
- final int throttleSize = 10;
- ServerConfiguration conf = new ServerConfiguration() {
- @Override
- public int getDefaultMessageWindowSize() {
- return throttleSize;
- }
- };
- ByteString topic = ByteString.copyFromUtf8("throttlingRaceTopic");
- ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber");
-
- PersistenceManager pm = new StubPersistenceManager();
- FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
- ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
- SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
-
- PipelineFilter filter = new PipelineFilter();
- filter.addLast(new AllToAllTopologyFilter());
- filter.initialize(conf.getConf());
- filter.setSubscriptionPreferences(topic, subscriber, prefs);
-
- CountDownLatch l = new CountDownLatch(numMessages);
-
- TestCallback firstCallback = null;
- for (int i = 0; i < numMessages; i++) {
- Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(i))).build();
- TestCallback cb = new TestCallback(l);
- if (firstCallback == null) {
- firstCallback = cb;
- }
- pm.persistMessage(new PersistRequest(topic, m, cb, null));
- }
- fdm.start();
- assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS));
- fdm.startServingSubscription(topic, subscriber, prefs, firstCallback.getId(), dep, filter,
- new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- // would not happened
- }
- }, null);
-
- int count = 30; // wait for 30 seconds maximum
- while (dep.getNumDelivered() < numMessages) {
- Thread.sleep(1000);
- if (count-- == 0) {
- break;
- }
- }
- assertEquals("Should have delivered " + numMessages, numMessages, dep.getNumDelivered());
- }
-
-}