You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/09/05 12:32:40 UTC

svn commit: r1165231 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java

Author: ivank
Date: Mon Sep  5 10:32:39 2011
New Revision: 1165231

URL: http://svn.apache.org/viewvc?rev=1165231&view=rev
Log:
BOOKKEEPER-63: Hedwig PubSubServer must wait for its Zookeeper client to be connected upon startup (morel via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1165231&r1=1165230&r2=1165231&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Sep  5 10:32:39 2011
@@ -44,6 +44,7 @@ BUGFIXES:
   
   BOOKKEEPER-51: NullPointException at FIFODeliveryManager#deliveryPtrs (xulei via ivank)
 
+  BOOKKEEPER-63: Hedwig PubSubServer must wait for its Zookeeper client to be connected upon startup (morel via ivank)
 
  hedwig-client/
  

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1165231&r1=1165230&r2=1165231&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Sep  5 10:32:39 2011
@@ -24,9 +24,11 @@ import java.net.MalformedURLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.commons.configuration.ConfigurationException;
@@ -140,13 +142,25 @@ public class PubSubServer {
         return new RegionManager(pm, conf, zk, scheduler, new HedwigHubClientFactory(conf, clientChannelFactory));
     }
 
-    protected void instantiateZookeeperClient() throws IOException {
+    protected void instantiateZookeeperClient() throws Exception {
         if (!conf.isStandalone()) {
+            final CountDownLatch signalZkReady = new CountDownLatch(1);
+
             zk = new ZooKeeper(conf.getZkHost(), conf.getZkTimeout(), new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
+                    if(Event.KeeperState.SyncConnected.equals(event.getState())) {
+                        signalZkReady.countDown();
+                    }
                 }
             });
+            // wait until connection is effective
+            if (!signalZkReady.await(conf.getZkTimeout()*2, TimeUnit.MILLISECONDS)) {
+                logger.fatal("Could not establish connection with ZooKeeper after zk_timeout*2 = " +
+                        conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000).");
+                throw new Exception("Could not establish connection with ZooKeeper after zk_timeout*2 = " +
+                        conf.getZkTimeout()*2 + " ms. (Default value for zk_timeout is 2000).");
+            }
         }
     }
 

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1165231&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Mon Sep  5 10:32:39 2011
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.PubSubServer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Test;
+
+public class TestPubSubServerStartup {
+
+    private static Logger logger = Logger.getLogger(TestPubSubServerStartup.class);
+
+    /**
+     * Start-up zookeeper + pubsubserver reading from a config URL. Then stop
+     * and cleanup.
+     *
+     * Loop over that.
+     *
+     * If the pubsub server does not wait for its zookeeper client to be
+     * connected, the pubsub server will fail at startup.
+     *
+     */
+    @Test
+    public void testPubSubServerInstantiationWithConfig() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            logger.info("iteration " + i);
+            instantiateAndDestroyPubSubServer();
+        }
+    }
+
+    private void instantiateAndDestroyPubSubServer() throws IOException, InterruptedException, ConfigurationException,
+            MalformedURLException, Exception {
+        String hedwigParams = "default_server_host=localhost:4080\n" + "zookeeper_connection_string=localhost:2181\n"
+                + "zk_timeout=2000\n";
+
+        File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") + "/hedwig.cfg");
+        writeStringToFile(hedwigParams, hedwigConfigFile);
+
+        ClientBase.setupTestEnv();
+        File zkTmpDir = File.createTempFile("zookeeper", "test");
+        zkTmpDir.delete();
+        zkTmpDir.mkdir();
+
+        ZooKeeperServer zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, 2181);
+
+        NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory();
+        serverFactory.configure(new InetSocketAddress(2181), 100);
+        serverFactory.startup(zks);
+
+        boolean b = ClientBase.waitForServerUp("127.0.0.1:2181", 5000);
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.loadConf(hedwigConfigFile.toURI().toURL());
+
+        logger.info("Zookeeper server up and running!");
+
+        ZooKeeper zkc = new ZooKeeper("127.0.0.1", 2181, null);
+
+        // initialize the zk client with (fake) values
+        zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zkc.create("/ledgers/available", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        zkc.close();
+        PubSubServer hedwigServer = null;
+        try {
+            logger.info("starting hedwig broker!");
+            hedwigServer = new PubSubServer(serverConf);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        Assert.assertNotNull("failed to instantiate hedwig pub sub server", hedwigServer);
+
+        hedwigServer.shutdown();
+        serverFactory.shutdown();
+
+        zks.shutdown();
+
+        zkTmpDir.delete();
+
+        ClientBase.waitForServerDown("localhost:2181", 10000);
+
+    }
+
+    public static void writeStringToFile(String string, File f) throws IOException {
+        if (f.exists()) {
+            if (!f.delete()) {
+                throw new RuntimeException("cannot create file " + f.getAbsolutePath());
+            }
+        }
+        if (!f.createNewFile()) {
+            throw new RuntimeException("cannot create new file " + f.getAbsolutePath());
+        }
+
+        FileWriter fw = new FileWriter(f);
+        fw.write(string);
+        fw.close();
+    }
+}