You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC

svn commit: r1613315 [4/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/ bookkeeper-server/src/main/java/org/apache...

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Thu Jul 24 22:34:19 2014
@@ -459,6 +459,121 @@ public class TestBackwardCompat {
     }
 
     /**
+     * 4.2.0 Version
+     */
+    static class BookKeeperCluster420{
+
+        int numBookies;
+        List<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration> bkConfs;
+        List<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer> bks;
+
+
+        BookKeeperCluster420(int numBookies) {
+            this.numBookies = numBookies;
+        }
+
+        public void start() throws Exception {
+            zkUtil.startServer();
+
+            bks = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer>();
+            bkConfs = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration>();
+
+            for (int i=0; i<numBookies; i++) {
+                startBookieServer();
+            }
+        }
+
+        public void stop() throws Exception {
+            for (org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer bs : bks) {
+                bs.shutdown();
+            }
+            bks.clear();
+
+            zkUtil.killServer();
+        }
+
+        protected void startBookieServer() throws Exception {
+            int port = PortManager.nextFreePort();
+            File tmpDir = org.apache.hw_v4_2_0.hedwig.util.FileUtils.createTempDirectory(
+                getClass().getName() + port, "test");
+            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration(
+                port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir });
+            bks.add(startBookie(conf));
+            bkConfs.add(conf);
+        }
+
+        protected org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration newServerConfiguration(
+            int port, String zkServers, File journalDir, File[] ledgerDirs) {
+            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf =
+                new org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration();
+            conf.setBookiePort(port);
+            conf.setZkServers(zkServers);
+            conf.setJournalDirName(journalDir.getPath());
+            String[] ledgerDirNames = new String[ledgerDirs.length];
+            for (int i=0; i<ledgerDirs.length; i++) {
+                ledgerDirNames[i] = ledgerDirs[i].getPath();
+            }
+            conf.setLedgerDirNames(ledgerDirNames);
+            return conf;
+        }
+
+        protected org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer startBookie(
+            org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf) throws Exception {
+            org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer server
+                = new org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer(conf);
+            server.start();
+
+            int port = conf.getBookiePort();
+            while (zkUtil.getZooKeeperClient().exists(
+                    "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port,
+                    false) == null) {
+                Thread.sleep(500);
+            }
+            return server;
+        }
+    }
+
+    static class Server420 {
+        org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration conf;
+        org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer server;
+
+        Server420(final String zkHosts, final int port, final int sslPort) {
+            conf = new org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration() {
+                @Override
+                public int getConsumeInterval() {
+                    return CONSUMEINTERVAL;
+                }
+
+                @Override
+                public String getZkHost() {
+                    return zkHosts;
+                }
+
+                @Override
+                public int getServerPort() {
+                    return port;
+                }
+
+                @Override
+                public int getSSLServerPort() {
+                    return sslPort;
+                }
+            };
+        }
+
+        void start() throws Exception {
+            server = new org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer(conf);
+            server.start();
+        }
+
+        void stop() throws Exception {
+            if (null != server) {
+                server.shutdown();
+            }
+        }
+    }
+
+    /**
      * Current Version
      */
     static class BookKeeperClusterCurrent {
@@ -928,20 +1043,20 @@ public class TestBackwardCompat {
         bkc410.stop();
 
         // start bookkeeper current
-        BookKeeperClusterCurrent bkccur = new BookKeeperClusterCurrent(3);
-        bkccur.start();
+        BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
+        bkc420.start();
 
-        // start current server
-        ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(), port, sslPort);
-        scur.start();
+        // start 420 server
+        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
+        s420.start();
 
         ccur.subscribe(topic, subid, options5cur);
         ccur.closeSubscription(topic, subid);
         ccur.sendXExpectLastY(topic, subid, 50, 5);
 
-        // stop current servers
-        scur.stop();
-        bkccur.stop();
+        // stop 420 server
+        s420.stop();
+        bkc420.stop();
 
         ccur.close();
     }
@@ -982,9 +1097,9 @@ public class TestBackwardCompat {
         // stop 410 server
         s410.stop();
 
-        // start current server
-        ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(), port, sslPort);
-        scur.start();
+        // start 420 server
+        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort);
+        s420.start();
 
         // client c410 could publish message to 410 server
         // but no message seq id would be returned
@@ -995,8 +1110,8 @@ public class TestBackwardCompat {
         ccur.close();
         c410.close();
 
-        // stop current server
-        scur.stop();
+        // stop 420 server
+        s420.stop();
         bkc410.stop();
     }
 
@@ -1036,10 +1151,10 @@ public class TestBackwardCompat {
         // stop 410 server
         s410.stop();
 
-        // start current server
-        ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(),
+        // start 420 server
+        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
                                                port, sslPort);
-        scur.start();
+        s420.start();
 
         c410.subscribe(topic, sub410);
         c410.receiveInts(topic, sub410, 0, 10);
@@ -1053,8 +1168,8 @@ public class TestBackwardCompat {
         c410.receiveInts(topic, sub410, 10, 10);
         ccur.receiveInts(topic, subcur, 10, 10);
 
-        // stop current server
-        scur.stop();
+        // stop 420 server
+        s420.stop();
 
         c410.close();
         ccur.close();
@@ -1074,16 +1189,16 @@ public class TestBackwardCompat {
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
         // start bookkeeper
-        BookKeeperClusterCurrent bkccur= new BookKeeperClusterCurrent(3);
-        bkccur.start();
+        BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3);
+        bkc420.start();
 
         int port = PortManager.nextFreePort();
         int sslPort = PortManager.nextFreePort();
 
         // start hub server
-        ServerCurrent scur = new ServerCurrent(zkUtil.getZooKeeperConnectString(),
+        Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(),
                                                port, sslPort);
-        scur.start();
+        s420.start();
 
         org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur =
             org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
@@ -1120,14 +1235,14 @@ public class TestBackwardCompat {
         // the message bound should be updated.
         c410.sendXExpectLastY(topic, subid, 50, 5);
 
-        // stop current server
-        scur.stop();
+        // stop 420 server
+        s420.stop();
 
         c410.close();
         ccur.close();
 
         // stop bookkeeper cluster
-        bkccur.stop();
+        bkc420.stop();
     }
 
     /**