You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC
svn commit: r1613315 [4/4] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/
bookkeeper-server/src/main/java/org/apache...
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Thu Jul 24 22:34:19 2014
@@ -459,6 +459,121 @@ public class TestBackwardCompat {
}
/**
+ * 4.2.0 Version
+ */
+ static class BookKeeperCluster420{
+
+ int numBookies;
+ List<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration> bkConfs;
+ List<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer> bks;
+
+
+ BookKeeperCluster420(int numBookies) {
+ this.numBookies = numBookies;
+ }
+
+ public void start() throws Exception {
+ zkUtil.startServer();
+
+ bks = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer>();
+ bkConfs = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration>();
+
+ for (int i=0; i<numBookies; i++) {
+ startBookieServer();
+ }
+ }
+
+ public void stop() throws Exception {
+ for (org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer bs : bks) {
+ bs.shutdown();
+ }
+ bks.clear();
+
+ zkUtil.killServer();
+ }
+
+ protected void startBookieServer() throws Exception {
+ int port = PortManager.nextFreePort();
+ File tmpDir = org.apache.hw_v4_2_0.hedwig.util.FileUtils.createTempDirectory(
+ getClass().getName() + port, "test");
+ org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
+ port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
+ bks.add(startBookie(conf));
+ bkConfs.add(conf);
+ }
+
+ protected org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration newServerConfiguration(
+ int port, String zkServers, File journalDir, File[] ledgerDirs) {
+ org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf =
+ new org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration();
+ conf.setBookiePort(port);
+ conf.setZkServers(zkServers);
+ conf.setJournalDirName(journalDir.getPath());
+ String[] ledgerDirNames = new String[ledgerDirs.length];
+ for (int i=0; i<ledgerDirs.length; i++) {
+ ledgerDirNames[i] = ledgerDirs[i].getPath();
+ }
+ conf.setLedgerDirNames(ledgerDirNames);
+ return conf;
+ }
+
+ protected org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer startBookie(
+ org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf) throws Exception {
+ org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer server
+ = new org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer(conf);
+ server.start();
+
+ int port = conf.getBookiePort();
+ while (zkUtil.getZooKeeperClient().exists(
+ "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
+ false) == null) {
+ Thread.sleep(500);
+ }
+ return server;
+ }
+ }
+
+ static class Server420 {
+ org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration conf;
+ org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer server;
+
+ Server420(final String zkHosts, final int port, final int sslPort) {
+ conf = new org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration() {
+ @Override
+ public int getConsumeInterval() {
+ return CONSUMEINTERVAL;
+ }
+
+ @Override
+ public String getZkHost() {
+ return zkHosts;
+ }
+
+ @Override
+ public int getServerPort() {
+ return port;
+ }
+
+ @Override
+ public int getSSLServerPort() {
+ return sslPort;
+ }
+ };
+ }
+
+ void start() throws Exception {
+ server = new org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer(conf);
+ server.start();
+ }
+
+ void stop() throws Exception {
+ if (null != server) {
+ server.shutdown();
+ }
+ }
+ }
+
+ /**
* Current Version
*/
static class BookKeeperClusterCurrent {
@@ -928,20 +1043,20 @@ public class TestBackwardCompat {
bkc410.stop();
// start bookkeeper current
- BookKeeperClusterCurrent bkccur = new BookKeeperClusterCurrent(3);
- bkccur.start();
+ BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
+ bkc420.start();
- // start current server
- ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(), port, sslPort);
- scur.start();
+ // start 420 server
+ Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
+ s420.start();
ccur.subscribe(topic, subid, options5cur);
ccur.closeSubscription(topic, subid);
ccur.sendXExpectLastY(topic, subid, 50, 5);
- // stop current servers
- scur.stop();
- bkccur.stop();
+ // stop 420 server
+ s420.stop();
+ bkc420.stop();
ccur.close();
}
@@ -982,9 +1097,9 @@ public class TestBackwardCompat {
// stop 410 server
s410.stop();
- // start current server
- ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(), port, sslPort);
- scur.start();
+ // start 420 server
+ Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
+ s420.start();
// client c410 could publish message to 410 server
// but no message seq id would be returned
@@ -995,8 +1110,8 @@ public class TestBackwardCompat {
ccur.close();
c410.close();
- // stop current server
- scur.stop();
+ // stop 420 server
+ s420.stop();
bkc410.stop();
}
@@ -1036,10 +1151,10 @@ public class TestBackwardCompat {
// stop 410 server
s410.stop();
- // start current server
- ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(),
+ // start 420 server
+ Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
port, sslPort);
- scur.start();
+ s420.start();
c410.subscribe(topic, sub410);
c410.receiveInts(topic, sub410, 0, 10);
@@ -1053,8 +1168,8 @@ public class TestBackwardCompat {
c410.receiveInts(topic, sub410, 10, 10);
ccur.receiveInts(topic, subcur, 10, 10);
- // stop current server
- scur.stop();
+ // stop 420 server
+ s420.stop();
c410.close();
ccur.close();
@@ -1074,16 +1189,16 @@ public class TestBackwardCompat {
ByteString subid = ByteString.copyFromUtf8("mysub");
// start bookkeeper
- BookKeeperClusterCurrent bkccur= new BookKeeperClusterCurrent(3);
- bkccur.start();
+ BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
+ bkc420.start();
int port = PortManager.nextFreePort();
int sslPort = PortManager.nextFreePort();
// start hub server
- ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(),
+ Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
port, sslPort);
- scur.start();
+ s420.start();
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur =
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
@@ -1120,14 +1235,14 @@ public class TestBackwardCompat {
// the message bound should be updated.
c410.sendXExpectLastY(topic, subid, 50, 5);
- // stop current server
- scur.stop();
+ // stop 420 server
+ s420.stop();
c410.close();
ccur.close();
// stop bookkeeper cluster
- bkccur.stop();
+ bkc420.stop();
}
/**